diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 3c1fb4b6ade62..dcdf33f7216aa 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -89,7 +89,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.MapperService.MergeReason; import org.opensearch.index.query.QueryShardContext; -import org.opensearch.index.remote.RemoteStoreCustomDataResolver; +import org.opensearch.index.remote.RemoteStoreCustomMetadataResolver; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; @@ -178,7 +178,7 @@ public class MetadataCreateIndexService { private AwarenessReplicaBalance awarenessReplicaBalance; @Nullable - private final RemoteStoreCustomDataResolver remoteStoreCustomDataResolver; + private final RemoteStoreCustomMetadataResolver remoteStoreCustomMetadataResolver; public MetadataCreateIndexService( final Settings settings, @@ -213,8 +213,8 @@ public MetadataCreateIndexService( // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true); Supplier minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion(); - remoteStoreCustomDataResolver = isRemoteDataAttributePresent(settings) - ? new RemoteStoreCustomDataResolver(remoteStoreSettings, minNodeVersionSupplier) + remoteStoreCustomMetadataResolver = isRemoteDataAttributePresent(settings) + ? new RemoteStoreCustomMetadataResolver(remoteStoreSettings, minNodeVersionSupplier) : null; } @@ -563,7 +563,7 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata( tmpImdBuilder.setRoutingNumShards(routingNumShards); tmpImdBuilder.settings(indexSettings); tmpImdBuilder.system(isSystem); - addRemoteStoreCustomData(tmpImdBuilder, true); + addRemoteStoreCustomMetadata(tmpImdBuilder, true); // Set up everything, now locally create the index to see that things are ok, and apply IndexMetadata tempMetadata = tmpImdBuilder.build(); @@ -573,13 +573,13 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata( } /** - * Adds the remote store path type information in custom data of index metadata. + * Adds the 1) remote store path type 2) ckp as translog metadata information in custom data of index metadata. * * @param tmpImdBuilder index metadata builder. * @param assertNullOldType flag to verify that the old remote store path type is null */ - public void addRemoteStoreCustomData(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) { - if (remoteStoreCustomDataResolver == null) { + public void addRemoteStoreCustomMetadata(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) { + if (remoteStoreCustomMetadataResolver == null) { return; } // It is possible that remote custom data exists already. In such cases, we need to only update the path type @@ -587,13 +587,14 @@ public void addRemoteStoreCustomData(IndexMetadata.Builder tmpImdBuilder, boolea Map existingCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); assert assertNullOldType == false || Objects.isNull(existingCustomData); - // Determine the path type for use using the remoteStorePathResolver. - RemoteStorePathStrategy newPathStrategy = remoteStoreCustomDataResolver.get(); Map remoteCustomData = new HashMap<>(); - boolean translocCkpAsMetadataUploadAllowed = remoteStoreCustomDataResolver.getRemoteStoreTranslogCkpAsMetadataAllowed(); - remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(translocCkpAsMetadataUploadAllowed)); + // Determine if the ckp would be stored as translog metadata + boolean isCkpAsTranslogMetadata = remoteStoreCustomMetadataResolver.isCkpAsTranslogMetadata(); + remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(isCkpAsTranslogMetadata)); + // Determine the path type for use using the remoteStorePathResolver. + RemoteStorePathStrategy newPathStrategy = remoteStoreCustomMetadataResolver.getPathStrategy(); remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name()); if (Objects.nonNull(newPathStrategy.getHashAlgorithm())) { remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name()); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java index 384c7a63f92a4..2431f57a6a1f9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/IndexMetadataUpdater.java @@ -176,7 +176,7 @@ public Metadata applyChanges(Metadata oldMetadata, RoutingTable newRoutingTable, oldMetadata.settings(), logger ); - migrationImdUpdater.maybeUpdateRemoteStoreCustomData(indexMetadataBuilder, index.getName()); + migrationImdUpdater.maybeUpdateRemoteStoreCustomMetadata(indexMetadataBuilder, index.getName()); migrationImdUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, index.getName()); } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 04f1ede874e16..78c8adbd379d3 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -740,7 +740,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS, - RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING + RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA ) ) ); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index bf544cf19912f..b1b805fbe6b7a 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -763,7 +763,7 @@ public static IndexMergePolicy fromString(String text) { private final boolean widenIndexSortType; private final boolean assignedOnRemoteNode; private final RemoteStorePathStrategy remoteStorePathStrategy; - private final boolean translogCkpAsMetadataUploadAllowed; + private final boolean ckpAsTranslogMetadata; /** * The maximum age of a retention lease before it is considered expired. @@ -990,7 +990,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings()); remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata); - translogCkpAsMetadataUploadAllowed = RemoteStoreUtils.determineTranslogCkpUploadAsMetadataAllowed(indexMetadata); + ckpAsTranslogMetadata = RemoteStoreUtils.determineCkpAsTranslogMetadata(indexMetadata); setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING)); setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING)); @@ -1915,7 +1915,7 @@ public RemoteStorePathStrategy getRemoteStorePathStrategy() { return remoteStorePathStrategy; } - public boolean getTranslogCkpAsMetadataUploadAllowed() { - return translogCkpAsMetadataUploadAllowed; + public boolean isCkpAsTranslogMetadata() { + return ckpAsTranslogMetadata; } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java index d1d835d811a7a..5a03ad8eb14dd 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java @@ -28,7 +28,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; -import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStoreCustomDataDuringMigration; +import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStoreCustomMetadataDuringMigration; import static org.opensearch.index.remote.RemoteStoreUtils.getRemoteStoreRepoName; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -127,12 +127,12 @@ private boolean needsRemoteIndexSettingsUpdate( * @param indexMetadataBuilder Mutated {@link IndexMetadata.Builder} having the previous state updates * @param index index name */ - public void maybeUpdateRemoteStoreCustomData(IndexMetadata.Builder indexMetadataBuilder, String index) { - if (indexHasRemoteCustomData(indexMetadata) == false) { + public void maybeUpdateRemoteStoreCustomMetadata(IndexMetadata.Builder indexMetadataBuilder, String index) { + if (indexHasRemoteCustomMetadata(indexMetadata) == false) { logger.info("Adding remote store custom data for index [{}] during migration", index); indexMetadataBuilder.putCustom( REMOTE_STORE_CUSTOM_KEY, - determineRemoteStoreCustomDataDuringMigration(clusterSettings, discoveryNodes) + determineRemoteStoreCustomMetadataDuringMigration(clusterSettings, discoveryNodes) ); } else { logger.debug("Index {} already has remote store custom data", index); @@ -140,7 +140,7 @@ public void maybeUpdateRemoteStoreCustomData(IndexMetadata.Builder indexMetadata } public static boolean indexHasAllRemoteStoreRelatedMetadata(IndexMetadata indexMetadata) { - return indexHasRemoteStoreSettings(indexMetadata.getSettings()) && indexHasRemoteCustomData(indexMetadata); + return indexHasRemoteStoreSettings(indexMetadata.getSettings()) && indexHasRemoteCustomMetadata(indexMetadata); } /** @@ -167,7 +167,7 @@ public static boolean indexHasRemoteStoreSettings(Settings indexSettings) { * @param indexMetadata Current index metadata * @return true if all above conditions match. false otherwise */ - public static boolean indexHasRemoteCustomData(IndexMetadata indexMetadata) { + public static boolean indexHasRemoteCustomMetadata(IndexMetadata indexMetadata) { Map customMetadata = indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY); return Objects.nonNull(customMetadata) && Objects.nonNull(customMetadata.get(PathType.NAME)) diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomDataResolver.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolver.java similarity index 81% rename from server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomDataResolver.java rename to server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolver.java index 18d9bcc7b398d..d481a9cc5421c 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomDataResolver.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolver.java @@ -22,17 +22,17 @@ * @opensearch.internal */ @ExperimentalApi -public class RemoteStoreCustomDataResolver { +public class RemoteStoreCustomMetadataResolver { private final RemoteStoreSettings remoteStoreSettings; private final Supplier minNodeVersionSupplier; - public RemoteStoreCustomDataResolver(RemoteStoreSettings remoteStoreSettings, Supplier minNodeVersionSupplier) { + public RemoteStoreCustomMetadataResolver(RemoteStoreSettings remoteStoreSettings, Supplier minNodeVersionSupplier) { this.remoteStoreSettings = remoteStoreSettings; this.minNodeVersionSupplier = minNodeVersionSupplier; } - public RemoteStorePathStrategy get() { + public RemoteStorePathStrategy getPathStrategy() { PathType pathType; PathHashAlgorithm pathHashAlgorithm; // Min node version check ensures that we are enabling the new prefix type only when all the nodes understand it. @@ -42,8 +42,8 @@ public RemoteStorePathStrategy get() { return new RemoteStorePathStrategy(pathType, pathHashAlgorithm); } - public boolean getRemoteStoreTranslogCkpAsMetadataAllowed() { - return Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 && remoteStoreSettings.getEnableTranslogCkpAsMetadataUpload(); + public boolean isCkpAsTranslogMetadata() { + return Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 && remoteStoreSettings.isCkpAsTranslogMetadata(); } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index 2b1496851c11c..3b307e72dafa5 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -32,7 +32,7 @@ import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; -import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA; /** * Utils for remote store @@ -185,7 +185,7 @@ public static RemoteStorePathStrategy determineRemoteStorePathStrategy(IndexMeta /** * Determines whether translog ckp upload as metadata allowed or not */ - public static boolean determineTranslogCkpUploadAsMetadataAllowed(IndexMetadata indexMetadata) { + public static boolean determineCkpAsTranslogMetadata(IndexMetadata indexMetadata) { Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); assert remoteCustomData == null || remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA); if (remoteCustomData != null && remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA)) { @@ -198,25 +198,27 @@ public static boolean determineTranslogCkpUploadAsMetadataAllowed(IndexMetadata * Generates the remote store path type information to be added to custom data of index metadata during migration * * @param clusterSettings Current Cluster settings from {@link ClusterState} - * @param discoveryNodes Current {@link DiscoveryNodes} from the cluster state + * @param discoveryNodes Current {@link DiscoveryNodes} from the cluster state * @return {@link Map} to be added as custom data in index metadata */ - public static Map determineRemoteStoreCustomDataDuringMigration( + public static Map determineRemoteStoreCustomMetadataDuringMigration( Settings clusterSettings, DiscoveryNodes discoveryNodes ) { + Map remoteCustomData = new HashMap<>(); Version minNodeVersion = discoveryNodes.getMinNodeVersion(); + + boolean ckpAsMetadata = Version.CURRENT.compareTo(minNodeVersion) <= 0 + && CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA.get(clusterSettings); + remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(ckpAsMetadata)); + RemoteStoreEnums.PathType pathType = Version.CURRENT.compareTo(minNodeVersion) <= 0 ? CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.get(clusterSettings) : RemoteStoreEnums.PathType.FIXED; RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm = pathType == RemoteStoreEnums.PathType.FIXED ? null : CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.get(clusterSettings); - boolean ckpAsMetadata = Version.CURRENT.compareTo(minNodeVersion) <= 0 - && CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING.get(clusterSettings); - Map remoteCustomData = new HashMap<>(); remoteCustomData.put(RemoteStoreEnums.PathType.NAME, pathType.name()); - remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(ckpAsMetadata)); if (Objects.nonNull(pathHashAlgorithm)) { remoteCustomData.put(RemoteStoreEnums.PathHashAlgorithm.NAME, pathHashAlgorithm.name()); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 6e9c694e6baff..735c98157bf0b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -5002,7 +5002,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { remoteStoreSettings, logger, shouldSeedRemoteStore(), - indexSettings.getTranslogCkpAsMetadataUploadAllowed() + indexSettings.isCkpAsTranslogMetadata() ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 225d10754fc81..c75353cc3a2b6 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -112,10 +112,7 @@ public RemoteFsTranslog( this.startedPrimarySupplier = startedPrimarySupplier; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; - boolean ckpAsTranslogMetadata = isCkpAsTranslogMetadata( - indexSettings().getTranslogCkpAsMetadataUploadAllowed(), - blobStoreRepository - ); + boolean ckpAsTranslogMetadata = isCkpAsTranslogMetadata(indexSettings().isCkpAsTranslogMetadata(), blobStoreRepository); fileTransferTracker = FileTransferTrackerFactory.getFileTransferTracker( shardId, remoteTranslogTransferTracker, @@ -174,8 +171,8 @@ RemoteTranslogTransferTracker getRemoteTranslogTracker() { return remoteTranslogTransferTracker; } - private static boolean isCkpAsTranslogMetadata(boolean isCkpAsMetadataUploadAllowed, BlobStoreRepository blobStoreRepository) { - return blobStoreRepository.blobStore().isBlobMetadataSupported() && isCkpAsMetadataUploadAllowed; + private static boolean isCkpAsTranslogMetadata(boolean ckpAsTranslogMetadata, BlobStoreRepository blobStoreRepository) { + return blobStoreRepository.blobStore().isBlobMetadataSupported() && ckpAsTranslogMetadata; } public static void download( @@ -187,7 +184,7 @@ public static void download( RemoteStoreSettings remoteStoreSettings, Logger logger, boolean seedRemote, - boolean ckpAsMetadataUploadAllowed + boolean ckpAsTranslogMetadata ) throws IOException { assert repository instanceof BlobStoreRepository : String.format( Locale.ROOT, @@ -196,7 +193,7 @@ public static void download( ); BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; - boolean ckpAsTranslogMetadata = isCkpAsTranslogMetadata(ckpAsMetadataUploadAllowed, blobStoreRepository); + ckpAsTranslogMetadata = isCkpAsTranslogMetadata(ckpAsTranslogMetadata, blobStoreRepository); // We use a dummy stats tracker to ensure the flow doesn't break. // TODO: To be revisited as part of https://github.com/opensearch-project/OpenSearch/issues/7567 RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000); @@ -454,8 +451,8 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws } // Visible for testing - public Set allUploaded() { - return fileTransferTracker.allUploadedGeneration(); + int allUploadedSize() { + return fileTransferTracker.allUploadedGenerationSize(); } private boolean syncToDisk() throws IOException { @@ -575,7 +572,7 @@ public void trimUnreferencedReaders() throws IOException { // This enables us to restore translog from the metadata in case of failover or relocation. Set generationsToDelete = new HashSet<>(); for (long generation = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); generation >= 0; generation--) { - if (fileTransferTracker.isGenerationUploaded(generation) == false) { + if (fileTransferTracker.uploaded(generation) == false) { break; } generationsToDelete.add(generation); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index a83180395ccf6..8a0cf04e33a78 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -173,12 +173,8 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I @ExperimentalApi public InputStreamWithMetadata downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { // If the blob store supports metadata, retrieve the blob with metadata; otherwise, retrieve the blob without metadata. - if (blobStore.isBlobMetadataSupported()) { - return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); - } else { - InputStream inputStream = blobStore.blobContainer((BlobPath) path).readBlob(fileName); - return new InputStreamWithMetadata(inputStream, null); - } + assert blobStore.isBlobMetadataSupported(); + return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); } @Override diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 1fc64c9313735..c8f6e03634445 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -94,12 +94,7 @@ public boolean equals(Object o) { @Override public String toString() { - return new StringBuilder("FileInfo [").append(" name = ") - .append(name) - .append(", path = ") - .append(path.toUri()) - .append("]") - .toString(); + return "FileInfo [" + " name = " + name + ", path = " + path.toUri() + "]"; } @Override @@ -117,10 +112,6 @@ public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; private Long checksum; - public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException { - this(path, primaryTerm, checksum, null); - } - public TransferFileSnapshot(Path path, long primaryTerm, Long checksum, Map metadata) throws IOException { super(path, metadata); this.primaryTerm = primaryTerm; @@ -167,8 +158,7 @@ public static final class TranslogFileSnapshot extends TransferFileSnapshot { private final long generation; public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum) throws IOException { - super(path, primaryTerm, checksum); - this.generation = generation; + this(primaryTerm, generation, path, checksum, null); } public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum, Map metadata) @@ -211,7 +201,7 @@ public static final class CheckpointFileSnapshot extends TransferFileSnapshot { public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path, Long checksum) throws IOException { - super(path, primaryTerm, checksum); + super(path, primaryTerm, checksum, null); this.minTranslogGeneration = minTranslogGeneration; this.generation = generation; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index 67f61f40466fa..6c843e8a0167e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -12,7 +12,6 @@ import org.opensearch.common.logging.Loggers; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.remote.RemoteTranslogTransferTracker; -import org.opensearch.index.translog.transfer.listener.FileTransferListener; import java.util.Map; import java.util.Objects; @@ -26,9 +25,9 @@ * * @opensearch.internal */ -public abstract class FileTransferTracker implements FileTransferListener { +public abstract class FileTransferTracker { - final Map generationTransferTracker; + final Map generationTransferTracker; final RemoteTranslogTransferTracker remoteTranslogTransferTracker; Map bytesForTlogCkpFileToUpload; long fileTransferStartTime = -1; @@ -40,18 +39,6 @@ public FileTransferTracker(ShardId shardId, RemoteTranslogTransferTracker remote this.logger = Loggers.getLogger(getClass(), shardId); } - @Override - public abstract void onSuccess(TranslogCheckpointSnapshot fileSnapshot); - - @Override - public abstract void onFailure(TranslogCheckpointSnapshot fileSnapshot, Exception e); - - public abstract boolean isUploaded(String key); - - abstract void deleteGenerations(Set generations); - - abstract void recordBytesForFiles(Set toUpload); - void recordFileTransferStartTime(long uploadStartTime) { // Recording the start time more than once for a sync is invalid if (fileTransferStartTime == -1) { @@ -59,6 +46,8 @@ void recordFileTransferStartTime(long uploadStartTime) { } } + abstract void recordBytesForFiles(Set toUpload); + void recordFileContentLength(String fileName, LongSupplier contentLengthSupplier) { bytesForTlogCkpFileToUpload.put(fileName, contentLengthSupplier.getAsLong()); } @@ -67,21 +56,45 @@ long getTotalBytesToUpload() { return bytesForTlogCkpFileToUpload.values().stream().reduce(0L, Long::sum); } + public abstract void onSuccess(TranslogCheckpointSnapshot fileSnapshot); + void addGeneration(long generation, boolean success) { TransferState targetState = success ? TransferState.SUCCESS : TransferState.FAILED; - updateTransferState(generationTransferTracker, Long.toString(generation), targetState); + updateTransferState(generationTransferTracker, generation, targetState); + } + + /** + * Updates the transfer state for the given key in the specified tracker map. + */ + void updateTransferState(Map tracker, K key, TransferState targetState) { + tracker.compute(key, (k, v) -> { + if (v == null || v.validateNextState(targetState)) { + return targetState; + } + throw new IllegalStateException("Unexpected transfer state " + v + " while setting target to " + targetState); + }); + } + + public abstract void onFailure(TranslogCheckpointSnapshot fileSnapshot, Exception e); + + abstract void deleteGenerations(Set generations); + + public boolean uploaded(Long generation) { + return generationTransferTracker.get(generation) == TransferState.SUCCESS; } - public boolean isGenerationUploaded(Long generation) { - return generationTransferTracker.get(Long.toString(generation)) == TransferState.SUCCESS; + Set exclusionFilter(Set original) { + return original.stream() + .filter(fileSnapshot -> generationTransferTracker.get(fileSnapshot.getGeneration()) != TransferState.SUCCESS) + .collect(Collectors.toSet()); } - public Set allUploadedGeneration() { - return getSuccessfulKeys(generationTransferTracker); + public int allUploadedGenerationSize() { + return generationTransferTracker.size(); } /** - * @param bytes bytes to add in remote translog transfer tracker. + * @param bytes bytes to add in remote translog transfer tracker. * @param isSuccess represent if provided bytes failed or succeeded */ void updateUploadBytesInRemoteTranslogTransferTracker(long bytes, boolean isSuccess) { @@ -102,36 +115,6 @@ void updateTranslogTransferStats(String fileName, boolean isSuccess) { updateUploadBytesInRemoteTranslogTransferTracker(fileBytes, isSuccess); } - Set exclusionFilter(Set original) { - return original.stream() - .filter(fileSnapshot -> generationTransferTracker.get(Long.toString(fileSnapshot.getGeneration())) != TransferState.SUCCESS) - .collect(Collectors.toSet()); - } - - /** - * Updates the transfer state for the given key in the specified tracker map. - */ - void updateTransferState(Map tracker, K key, TransferState targetState) { - tracker.compute(key, (k, v) -> { - if (v == null || v.validateNextState(targetState)) { - return targetState; - } - throw new IllegalStateException("Unexpected transfer state " + v + " while setting target to " + targetState); - }); - } - - /** - * Retrieves a set of keys from the given tracker map whose corresponding values are equal to the - * {@link TransferState#SUCCESS} state. - */ - Set getSuccessfulKeys(Map tracker) { - return tracker.entrySet() - .stream() - .filter(entry -> entry.getValue() == TransferState.SUCCESS) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - } - /** * Represents the state of the upload operation */ diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTrackerFactory.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTrackerFactory.java index de6d3fff693eb..639275d1e7e70 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTrackerFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTrackerFactory.java @@ -17,6 +17,7 @@ * @opensearch.internal */ public class FileTransferTrackerFactory { + public static FileTransferTracker getFileTransferTracker( ShardId shardId, RemoteTranslogTransferTracker remoteTranslogTransferTracker, diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java index 19a4437e4e0ed..bf5cd7e58669c 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java @@ -18,15 +18,15 @@ */ public interface TransferSnapshot { - /** - * The translog transfer metadata of this {@link TransferSnapshot} - * @return the translog transfer metadata - */ - TranslogTransferMetadata getTranslogTransferMetadata(); - /** * The set of generational translog and checkpoint snapshots * @return the set of {@link TranslogCheckpointSnapshot} */ Set getTranslogCheckpointSnapshots(); + + /** + * The translog transfer metadata of this {@link TransferSnapshot} + * @return the translog transfer metadata + */ + TranslogTransferMetadata getTranslogTransferMetadata(); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointSnapshot.java index c0d449a29fca8..aad1044a0f45e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointSnapshot.java @@ -107,13 +107,9 @@ TransferFileSnapshot getCheckpointFileSnapshot() throws IOException { } TransferFileSnapshot getTranslogFileSnapshotWithMetadata() throws IOException { - Map metadata = createCheckpointDataAsObjectMetadata(); - return new TranslogFileSnapshot(primaryTerm, generation, translogPath, translogChecksum, metadata); - } - - private Map createCheckpointDataAsObjectMetadata() throws IOException { byte[] fileBytes = Checkpoint.createCheckpointBytes(checkpointPath, checkpoint); - return createMetadata(fileBytes); + Map metadata = createMetadata(fileBytes); + return new TranslogFileSnapshot(primaryTerm, generation, translogPath, translogChecksum, metadata); } static Map createMetadata(byte[] ckpBytes) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java index 773ba0900d68c..0b7a0333f0a7a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManager.java @@ -26,6 +26,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -79,7 +80,12 @@ public void transferTranslogCheckpointSnapshot( FileTransferException e = (FileTransferException) ex; TransferFileSnapshot failedSnapshot = e.getFileSnapshot(); latchedActionListener.onFailure( - new TranslogTransferException(fileToGenerationSnapshotMap.get(failedSnapshot), ex, Set.of(failedSnapshot), null) + new TranslogTransferException( + fileToGenerationSnapshotMap.get(failedSnapshot), + ex, + Set.of(failedSnapshot), + Collections.emptySet() + ) ); } ); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferTracker.java index 8b3f9c889cb41..f897811441ddb 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferTracker.java @@ -44,11 +44,6 @@ public void onFailure(TranslogCheckpointSnapshot fileSnapshot, Exception e) { addGeneration(fileSnapshot.getGeneration(), false); } - @Override - public boolean isUploaded(String generation) { - return super.isGenerationUploaded(Long.parseLong(generation)); - } - @Override void recordBytesForFiles(Set toUpload) { bytesForTlogCkpFileToUpload = new HashMap<>(); @@ -61,7 +56,7 @@ void recordBytesForFiles(Set toUpload) { @Override void deleteGenerations(Set generations) { for (Long generation : generations) { - generationTransferTracker.remove(Long.toString(generation)); + generationTransferTracker.remove(generation); } } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferManager.java index 0ead1eba97f6b..97f6580d7f804 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferManager.java @@ -22,6 +22,8 @@ import org.opensearch.indices.RemoteStoreSettings; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashSet; @@ -37,6 +39,7 @@ * @opensearch.internal */ public class TranslogCkpFilesTransferManager extends TranslogTransferManager { + public TranslogCkpFilesTransferManager( ShardId shardId, TransferService transferService, @@ -65,30 +68,31 @@ public void transferTranslogCheckpointSnapshot( ) throws Exception { for (TranslogCheckpointSnapshot tlogAndCkpTransferFileSnapshot : toUpload) { Set filesToUpload = new HashSet<>(); - Set exceptionList = ConcurrentCollections.newConcurrentSet(); - String tlogFileName = tlogAndCkpTransferFileSnapshot.getTranslogFileName(); String ckpFileName = tlogAndCkpTransferFileSnapshot.getCheckpointFileName(); - if (fileTransferTracker.isUploaded(tlogFileName) == false) { + assert fileTransferTracker instanceof TranslogCkpFilesTransferTracker; + TranslogCkpFilesTransferTracker translogCkpFilesTransferTracker = (TranslogCkpFilesTransferTracker) fileTransferTracker; + if (translogCkpFilesTransferTracker.uploaded(tlogFileName) == false) { filesToUpload.add(tlogAndCkpTransferFileSnapshot.getTranslogFileSnapshot()); } - if (fileTransferTracker.isUploaded(ckpFileName) == false) { + if (translogCkpFilesTransferTracker.uploaded(ckpFileName) == false) { filesToUpload.add(tlogAndCkpTransferFileSnapshot.getCheckpointFileSnapshot()); } - assert !filesToUpload.isEmpty(); - AtomicBoolean listenerProcessed = new AtomicBoolean(); - final CountDownLatch latch = new CountDownLatch(filesToUpload.size()); - if (latch.getCount() == 0) { + if (filesToUpload.isEmpty()) { + logger.info("Returning from transferTranslogCheckpointSnapshot without any upload"); latchedActionListener.onResponse(tlogAndCkpTransferFileSnapshot); } + final CountDownLatch latch = new CountDownLatch(filesToUpload.size()); Set succeededFiles = ConcurrentCollections.newConcurrentSet(); Set failedFiles = new HashSet<>(); + Set exceptionList = ConcurrentCollections.newConcurrentSet(); LatchedActionListener fileUploadListener = new LatchedActionListener<>( ActionListener.wrap(succeededFiles::add, exceptionList::add), latch ); + AtomicBoolean listenerProcessed = new AtomicBoolean(); ActionListener actionListener = ActionListener.runAfter(fileUploadListener, () -> { if (latch.getCount() == 0 && listenerProcessed.compareAndSet(false, true)) { if (exceptionList.isEmpty()) { @@ -130,6 +134,34 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca return true; } + /** + * Downloads the translog (tlog) or checkpoint (ckp) file from a remote source and saves it to the local FS. + * + * @param fileName The name of the checkpoint file (e.g., "translog.ckp"). + * @param location The local file system path where the checkpoint file will be stored. + * @param primaryTerm The primary term associated with the checkpoint file. + * @throws IOException If an I/O error occurs during the file download or copy operation. + */ + void downloadFileToFS(String fileName, Path location, String primaryTerm) throws IOException { + Path filePath = location.resolve(fileName); + // Here, we always override the existing file if present. + deleteFileIfExists(filePath); + + boolean downloadStatus = false; + long bytesToRead = 0, downloadStartTime = System.nanoTime(); + try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) { + // Capture number of bytes for stats before reading + bytesToRead = inputStream.available(); + Files.copy(inputStream, filePath); + downloadStatus = true; + } finally { + remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); + if (downloadStatus) { + remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead); + } + } + } + @Override public void deleteGenerationAsync(long primaryTerm, Set generations, Runnable onCompletion) { List translogFiles = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferTracker.java index f0e3fb167ab72..ff50813bfba50 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferTracker.java @@ -36,14 +36,8 @@ public TranslogCkpFilesTransferTracker(ShardId shardId, RemoteTranslogTransferTr public void onSuccess(TranslogCheckpointSnapshot fileSnapshot) { try { updateUploadTimeInRemoteTranslogTransferTracker(); - String tlogFileName = fileSnapshot.getTranslogFileName(); - String ckpFileName = fileSnapshot.getCheckpointFileName(); - if (isUploaded(tlogFileName) == false) { - updateTranslogTransferStats(tlogFileName, true); - } - if (isUploaded(ckpFileName) == false) { - updateTranslogTransferStats(ckpFileName, true); - } + mayBeUpdateTranslogTransferStats(fileSnapshot.getTranslogFileName()); + mayBeUpdateTranslogTransferStats(fileSnapshot.getCheckpointFileName()); } catch (Exception ex) { logger.error("Failure to update translog upload success stats", ex); } @@ -52,22 +46,28 @@ public void onSuccess(TranslogCheckpointSnapshot fileSnapshot) { addFile(fileSnapshot.getTranslogFileName(), true); } + private void mayBeUpdateTranslogTransferStats(String fileName) { + if (uploaded(fileName) == false) { + updateTranslogTransferStats(fileName, true); + } + } + public void onFailure(TranslogCheckpointSnapshot fileSnapshot, Exception e) { updateUploadTimeInRemoteTranslogTransferTracker(); addGeneration(fileSnapshot.getGeneration(), false); assert e instanceof TranslogTransferException; TranslogTransferException exception = (TranslogTransferException) e; - Set failedFiles = exception.getFailedFiles(); - Set successFiles = exception.getSuccessFiles(); - assert failedFiles.isEmpty() == false; - failedFiles.forEach(failedFile -> { - addFile(failedFile.getName(), false); - updateTranslogTransferStats(failedFile.getName(), false); - }); - successFiles.forEach(successFile -> { - addFile(successFile.getName(), true); - updateTranslogTransferStats(successFile.getName(), true); + updateFileAndStatsTracker(exception.getFailedFiles(), false); + updateFileAndStatsTracker(exception.getSuccessFiles(), true); + + assert exception.getFailedFiles().isEmpty() == false; + } + + private void updateFileAndStatsTracker(Set fileSnapshots, boolean success) { + fileSnapshots.forEach(failedFile -> { + addFile(failedFile.getName(), success); + updateTranslogTransferStats(failedFile.getName(), success); }); } @@ -76,34 +76,20 @@ public void addFile(String file, boolean success) { updateTransferState(fileTransferTracker, file, targetState); } - @Override - public boolean isUploaded(String file) { + public boolean uploaded(String file) { return fileTransferTracker.get(file) == TransferState.SUCCESS; } - // here along with generation we also mark status of files in the tracker. - @Override - void addGeneration(long generation, boolean success) { - TransferState targetState = success ? TransferState.SUCCESS : TransferState.FAILED; - updateTransferState(generationTransferTracker, Long.toString(generation), targetState); - - // add files as well. - String tlogFileName = Translog.getFilename(generation); - String ckpFileName = Translog.getCommitCheckpointFileName(generation); - addFile(tlogFileName, success); - addFile(ckpFileName, success); - } - @Override void recordBytesForFiles(Set toUpload) { bytesForTlogCkpFileToUpload = new HashMap<>(); toUpload.forEach(file -> { String tlogFileName = file.getTranslogFileName(); String ckpFileName = file.getCheckpointFileName(); - if (isUploaded(tlogFileName) == false) { + if (uploaded(tlogFileName) == false) { recordFileContentLength(tlogFileName, file::getTranslogFileContentLength); } - if (isUploaded(ckpFileName) == false) { + if (uploaded(ckpFileName) == false) { recordFileContentLength(ckpFileName, file::getCheckpointFileContentLength); } }); @@ -114,7 +100,7 @@ void deleteGenerations(Set generations) { for (Long generation : generations) { String tlogFileName = Translog.getFilename(generation); String ckpFileName = Translog.getCommitCheckpointFileName(generation); - generationTransferTracker.remove(Long.toString(generation)); + generationTransferTracker.remove(generation); fileTransferTracker.remove(tlogFileName); fileTransferTracker.remove(ckpFileName); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 41f225bd0de6d..b56d64741d49b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -117,14 +117,13 @@ abstract void transferTranslogCheckpointSnapshot( public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) throws IOException { + List exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount()); + Set toUpload = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount()); long metadataBytesToUpload; long metadataUploadStartTime; long uploadStartTime; long prevUploadBytesSucceeded = remoteTranslogTransferTracker.getUploadBytesSucceeded(); long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis(); - int totalFilesCount = transferSnapshot.getTranslogTransferMetadata().getCount(); - List exceptionList = new ArrayList<>(totalFilesCount); - Set toUpload = new HashSet<>(totalFilesCount); try { toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogCheckpointSnapshots())); @@ -167,7 +166,6 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans // TODO: Ideally each file's upload start time should be when it is actually picked for upload // https://github.com/opensearch-project/OpenSearch/issues/9729 fileTransferTracker.recordFileTransferStartTime(uploadStartTime); - transferTranslogCheckpointSnapshot(toUpload, blobPathMap, latchedActionListener); try { @@ -250,37 +248,6 @@ private void captureStatsOnUploadFailure() { public abstract boolean downloadTranslog(String primaryTerm, String generation, Path location) throws IOException; - /** - * Downloads the checkpoint (ckp) file from a remote source and saves it to the local FS. - * - *

This method retrieves the checkpoint file "translog.ckp", from a remote - * storage location and copies it to the specified local file system path. - * - * @param fileName The name of the checkpoint file (e.g., "translog.ckp"). - * @param location The local file system path where the checkpoint file will be stored. - * @param primaryTerm The primary term associated with the checkpoint file. - * @throws IOException If an I/O error occurs during the file download or copy operation. - */ - void downloadFileToFS(String fileName, Path location, String primaryTerm) throws IOException { - Path filePath = location.resolve(fileName); - // Here, we always override the existing file if present. - deleteFileIfExists(filePath); - - boolean downloadStatus = false; - long bytesToRead = 0, downloadStartTime = System.nanoTime(); - try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) { - // Capture number of bytes for stats before reading - bytesToRead = inputStream.available(); - Files.copy(inputStream, filePath); - downloadStatus = true; - } finally { - remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); - if (downloadStatus) { - remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead); - } - } - } - void deleteFileIfExists(Path filePath) throws IOException { if (Files.exists(filePath)) { Files.delete(filePath); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java deleted file mode 100644 index ae48aa0c61281..0000000000000 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.translog.transfer.listener; - -import org.opensearch.index.translog.transfer.TranslogCheckpointSnapshot; - -/** - * The listener to be invoked on the completion or failure of a {@link TranslogCheckpointSnapshot} or deletion of file - * - * @opensearch.internal - */ -public interface FileTransferListener { - - /** - * Invoked when the transfer of a single {@link TranslogCheckpointSnapshot} succeeds - * @param fileSnapshot the corresponding file snapshot - */ - void onSuccess(TranslogCheckpointSnapshot fileSnapshot); - - /** - * Invoked when the transfer of a single {@link TranslogCheckpointSnapshot} fails - * @param fileSnapshot the corresponding file snapshot - * @param e the exception while processing the {@link TranslogCheckpointSnapshot} - */ - void onFailure(TranslogCheckpointSnapshot fileSnapshot, Exception e); - -} diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index a7fa6be6e64e3..134ec5018fb9f 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -82,11 +82,11 @@ public class RemoteStoreSettings { /** * This setting is used to disable uploading translog.ckp file as metadata to translog.tlog. This setting is effective only for - * remote store enabled cluster. + * repositories that supports metadata read and write with metadata and is applicable for only remote store enabled clusters. */ @ExperimentalApi - public static final Setting CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING = Setting.boolSetting( - "cluster.remote_store.index.translog.enable_translog_ckp_as_metadata_upload", + public static final Setting CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA = Setting.boolSetting( + "cluster.remote_store.index.translog.ckp_as_metadata", true, Property.NodeScope, Property.Dynamic @@ -123,7 +123,7 @@ public class RemoteStoreSettings { private volatile RemoteStoreEnums.PathType pathType; private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm; private volatile int maxRemoteTranslogReaders; - private volatile boolean enableTranslogCkpAsMetadataUpload; + private volatile boolean ckpAsTranslogMetadata; public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings); @@ -147,11 +147,8 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { pathType = clusterSettings.get(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING); clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, this::setPathType); - enableTranslogCkpAsMetadataUpload = clusterSettings.get(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING); - clusterSettings.addSettingsUpdateConsumer( - CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING, - this::setEnableTranslogCkpAsMetadataUpload - ); + ckpAsTranslogMetadata = clusterSettings.get(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA, this::setCkpAsTranslogMetadata); pathHashAlgorithm = clusterSettings.get(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING); clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, this::setPathHashAlgorithm); @@ -198,12 +195,12 @@ private void setPathType(RemoteStoreEnums.PathType pathType) { this.pathType = pathType; } - private void setEnableTranslogCkpAsMetadataUpload(boolean enableTranslogCkpAsMetadataUpload) { - this.enableTranslogCkpAsMetadataUpload = enableTranslogCkpAsMetadataUpload; + private void setCkpAsTranslogMetadata(boolean ckpAsTranslogMetadata) { + this.ckpAsTranslogMetadata = ckpAsTranslogMetadata; } - public boolean getEnableTranslogCkpAsMetadataUpload() { - return enableTranslogCkpAsMetadataUpload; + public boolean isCkpAsTranslogMetadata() { + return ckpAsTranslogMetadata; } private void setPathHashAlgorithm(RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm) { diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index 576b45cf89ebe..93aac68eb898c 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -472,7 +472,7 @@ public ClusterState execute(ClusterState currentState) { .put(snapshotIndexMetadata.getSettings()) .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()) ); - createIndexService.addRemoteStoreCustomData(indexMdBuilder, false); + createIndexService.addRemoteStoreCustomMetadata(indexMdBuilder, false); shardLimitValidator.validateShardLimit( renamedIndexName, snapshotIndexMetadata.getSettings(), diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java index f2d6cf9e15539..ba3d28de72020 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdaterTests.java @@ -145,7 +145,7 @@ public void testMaybeAddRemoteIndexSettingsDoesNotUpdateIndexSettingsWithRelocat assertDocrepSettingsApplied(indexMetadataBuilder.build()); } - public void testMaybeUpdateRemoteStoreCustomDataExecutes() { + public void testMaybeUpdateRemoteStoreCustomMetadataExecutes() { Metadata currentMetadata = createIndexMetadataWithDocrepSettings(indexName); IndexMetadata existingIndexMetadata = currentMetadata.index(indexName); IndexMetadata.Builder builder = IndexMetadata.builder(existingIndexMetadata); @@ -163,11 +163,11 @@ public void testMaybeUpdateRemoteStoreCustomDataExecutes() { .build(), logger ); - migrationIndexMetadataUpdater.maybeUpdateRemoteStoreCustomData(builder, indexName); + migrationIndexMetadataUpdater.maybeUpdateRemoteStoreCustomMetadata(builder, indexName); assertCustomPathMetadataIsPresent(builder.build()); } - public void testMaybeUpdateRemoteStoreCustomDataDoesNotExecute() { + public void testMaybeUpdateRemoteStoreCustomMetadataDoesNotExecute() { Metadata currentMetadata = createIndexMetadataWithRemoteStoreSettings(indexName); IndexMetadata existingIndexMetadata = currentMetadata.index(indexName); IndexMetadata.Builder builder = IndexMetadata.builder(currentMetadata.index(indexName)); @@ -186,7 +186,7 @@ public void testMaybeUpdateRemoteStoreCustomDataDoesNotExecute() { logger ); - migrationIndexMetadataUpdater.maybeUpdateRemoteStoreCustomData(builder, indexName); + migrationIndexMetadataUpdater.maybeUpdateRemoteStoreCustomMetadata(builder, indexName); assertCustomPathMetadataIsPresent(builder.build()); } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomDataResolverTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolverTests.java similarity index 64% rename from server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomDataResolverTests.java rename to server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolverTests.java index 56e7c5efe66af..8b7fe5cc6b385 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomDataResolverTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreCustomMetadataResolverTests.java @@ -18,40 +18,40 @@ import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; -import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING; +import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA; -public class RemoteStoreCustomDataResolverTests extends OpenSearchTestCase { +public class RemoteStoreCustomMetadataResolverTests extends OpenSearchTestCase { - public void testGetMinVersionOlder() { + public void testGetPathStrategyMinVersionOlder() { Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(PathType.values())).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - RemoteStoreCustomDataResolver resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.V_2_13_0); - assertEquals(PathType.FIXED, resolver.get().getType()); - assertNull(resolver.get().getHashAlgorithm()); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_13_0); + assertEquals(PathType.FIXED, resolver.getPathStrategy().getType()); + assertNull(resolver.getPathStrategy().getHashAlgorithm()); } - public void testGetMinVersionNewer() { + public void testGetPathStrategyMinVersionNewer() { PathType pathType = randomFrom(PathType.values()); Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), pathType).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - RemoteStoreCustomDataResolver resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(pathType, resolver.get().getType()); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0); + assertEquals(pathType, resolver.getPathStrategy().getType()); if (pathType.requiresHashAlgorithm()) { - assertNotNull(resolver.get().getHashAlgorithm()); + assertNotNull(resolver.getPathStrategy().getHashAlgorithm()); } else { - assertNull(resolver.get().getHashAlgorithm()); + assertNull(resolver.getPathStrategy().getHashAlgorithm()); } } - public void testGetStrategy() { + public void testGetPathStrategyStrategy() { // FIXED type Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.FIXED).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - RemoteStoreCustomDataResolver resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.FIXED, resolver.get().getType()); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0); + assertEquals(PathType.FIXED, resolver.getPathStrategy().getType()); // FIXED type with hash algorithm settings = Settings.builder() @@ -60,24 +60,24 @@ public void testGetStrategy() { .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.FIXED, resolver.get().getType()); + resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0); + assertEquals(PathType.FIXED, resolver.getPathStrategy().getType()); // HASHED_PREFIX type with FNV_1A_COMPOSITE settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX).build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.get().getHashAlgorithm()); + resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.getPathStrategy().getHashAlgorithm()); // HASHED_PREFIX type with FNV_1A_COMPOSITE settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX).build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.get().getHashAlgorithm()); + resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.getPathStrategy().getHashAlgorithm()); // HASHED_PREFIX type with FNV_1A_BASE64 settings = Settings.builder() @@ -86,9 +86,9 @@ public void testGetStrategy() { .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.get().getHashAlgorithm()); + resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.getPathStrategy().getHashAlgorithm()); // HASHED_PREFIX type with FNV_1A_BASE64 settings = Settings.builder() @@ -97,27 +97,27 @@ public void testGetStrategy() { .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.get().getHashAlgorithm()); + resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.getPathStrategy().getHashAlgorithm()); } - public void testGetStrategyWithDynamicUpdate() { + public void testGetPathStrategyStrategyWithDynamicUpdate() { // Default value Settings settings = Settings.builder().build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - RemoteStoreCustomDataResolver resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertEquals(PathType.FIXED, resolver.get().getType()); - assertNull(resolver.get().getHashAlgorithm()); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0); + assertEquals(PathType.FIXED, resolver.getPathStrategy().getType()); + assertNull(resolver.getPathStrategy().getHashAlgorithm()); // Set HASHED_PREFIX with default hash algorithm clusterSettings.applySettings( Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX).build() ); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.get().getHashAlgorithm()); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.getPathStrategy().getHashAlgorithm()); // Set HASHED_PREFIX with FNV_1A_BASE64 hash algorithm clusterSettings.applySettings( @@ -126,15 +126,15 @@ public void testGetStrategyWithDynamicUpdate() { .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), PathHashAlgorithm.FNV_1A_BASE64) .build() ); - assertEquals(PathType.HASHED_PREFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.get().getHashAlgorithm()); + assertEquals(PathType.HASHED_PREFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.getPathStrategy().getHashAlgorithm()); // Set HASHED_INFIX with default hash algorithm clusterSettings.applySettings( Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_INFIX).build() ); - assertEquals(PathType.HASHED_INFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.get().getHashAlgorithm()); + assertEquals(PathType.HASHED_INFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.getPathStrategy().getHashAlgorithm()); // Set HASHED_INFIX with FNV_1A_BASE64 hash algorithm clusterSettings.applySettings( @@ -143,32 +143,32 @@ public void testGetStrategyWithDynamicUpdate() { .put(CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.getKey(), PathHashAlgorithm.FNV_1A_BASE64) .build() ); - assertEquals(PathType.HASHED_INFIX, resolver.get().getType()); - assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.get().getHashAlgorithm()); + assertEquals(PathType.HASHED_INFIX, resolver.getPathStrategy().getType()); + assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.getPathStrategy().getHashAlgorithm()); } - public void testTranslogCkpAsMetadataAllowedMinVersionNewer() { - Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING.getKey(), true).build(); + public void testTranslogCkpAsMetadataAllowedTrueWithMinVersionNewer() { + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA.getKey(), true).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - RemoteStoreCustomDataResolver resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.CURRENT); - assertTrue(resolver.getRemoteStoreTranslogCkpAsMetadataAllowed()); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.CURRENT); + assertTrue(resolver.isCkpAsTranslogMetadata()); } - public void testTranslogCkpAsMetadataAllowed2MinVersionNewer() { - Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING.getKey(), false).build(); + public void testTranslogCkpAsMetadataAllowedFalseWithMinVersionNewer() { + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA.getKey(), false).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - RemoteStoreCustomDataResolver resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.CURRENT); - assertFalse(resolver.getRemoteStoreTranslogCkpAsMetadataAllowed()); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.CURRENT); + assertFalse(resolver.isCkpAsTranslogMetadata()); } public void testTranslogCkpAsMetadataAllowedMinVersionOlder() { - Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING.getKey(), randomBoolean()).build(); + Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA.getKey(), randomBoolean()).build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings); - RemoteStoreCustomDataResolver resolver = new RemoteStoreCustomDataResolver(remoteStoreSettings, () -> Version.V_2_14_0); - assertFalse(resolver.getRemoteStoreTranslogCkpAsMetadataAllowed()); + RemoteStoreCustomMetadataResolver resolver = new RemoteStoreCustomMetadataResolver(remoteStoreSettings, () -> Version.V_2_14_0); + assertFalse(resolver.isCkpAsTranslogMetadata()); } } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 579c2b02a6026..67138192581e9 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -671,13 +671,13 @@ public void testSimpleOperationsUpload() throws Exception { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - assertEquals(2, translog.allUploaded().size()); + assertEquals(2, translog.allUploadedSize()); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); - assertEquals(3, translog.allUploaded().size()); + assertEquals(3, translog.allUploadedSize()); translog.rollGeneration(); - assertEquals(3, translog.allUploaded().size()); + assertEquals(3, translog.allUploadedSize()); Set mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); assertEquals(2, mdFiles.size()); @@ -722,7 +722,7 @@ public void testSimpleOperationsUpload() throws Exception { assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(2, translog.readers.size()); assertBusy(() -> { - assertEquals(2, translog.allUploaded().size()); + assertEquals(2, translog.allUploadedSize()); assertEquals( 4, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() @@ -736,7 +736,7 @@ public void testSimpleOperationsUpload() throws Exception { assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); assertBusy(() -> { - assertEquals(2, translog.allUploaded().size()); + assertEquals(2, translog.allUploadedSize()); assertEquals( 4, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() @@ -755,7 +755,7 @@ public void testSimpleOperationsUpload() throws Exception { assertEquals(1, translog.readers.size()); assertEquals(1, translog.stats().estimatedNumberOfOperations()); assertBusy(() -> { - assertEquals(2, translog.allUploaded().size()); + assertEquals(2, translog.allUploadedSize()); assertEquals( 4, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() @@ -775,7 +775,7 @@ public void testMetadataFileDeletion() throws Exception { assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); } - assertBusy(() -> assertEquals(2, translog.allUploaded().size())); + assertBusy(() -> assertEquals(2, translog.allUploadedSize())); assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int moreDocs = randomIntBetween(3, 10); logger.info("numDocs={} moreDocs={}", numDocs, moreDocs); @@ -785,7 +785,7 @@ public void testMetadataFileDeletion() throws Exception { translog.trimUnreferencedReaders(); assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1 + moreDocs, translog.readers.size()); - assertBusy(() -> assertEquals(1 + (long) moreDocs, translog.allUploaded().size())); + assertBusy(() -> assertEquals(1 + (long) moreDocs, translog.allUploadedSize())); assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); int totalDocs = numDocs + moreDocs; @@ -841,11 +841,11 @@ public void testDrainSync() throws Exception { // 3. After drainSync, if trimUnreferencedReaders is attempted, we do not delete from remote store. // 4. After drainSync, if an upload is an attempted, we do not upload to remote store. ArrayList ops = new ArrayList<>(); - assertEquals(0, translog.allUploaded().size()); + assertEquals(0, translog.allUploadedSize()); assertEquals(1, translog.readers.size()); addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(0), 0, primaryTerm.get(), new byte[] { 1 })); - assertEquals(2, translog.allUploaded().size()); + assertEquals(2, translog.allUploadedSize()); assertEquals(2, translog.readers.size()); assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); @@ -873,7 +873,7 @@ public void testDrainSync() throws Exception { assertBusy(() -> assertEquals(0, latch.getCount())); assertEquals(0, translog.availablePermits()); slowDown.setSleepSeconds(0); - assertEquals(3, translog.allUploaded().size()); + assertEquals(3, translog.allUploadedSize()); assertEquals(2, translog.readers.size()); Set mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); @@ -882,7 +882,7 @@ public void testDrainSync() throws Exception { translog.trimUnreferencedReaders(); assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); - assertEquals(3, translog.allUploaded().size()); + assertEquals(3, translog.allUploadedSize()); assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR))); // Case 4 - After drainSync, if an upload is an attempted, we do not upload to remote store. @@ -892,21 +892,21 @@ public void testDrainSync() throws Exception { new Translog.Index(String.valueOf(2), 2, primaryTerm.get(), new byte[] { 1 }) ); assertEquals(1, translog.readers.size()); - assertEquals(3, translog.allUploaded().size()); + assertEquals(3, translog.allUploadedSize()); assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR))); // Refill the permits back Releasables.close(releasable); addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(3), 3, primaryTerm.get(), new byte[] { 1 })); assertEquals(2, translog.readers.size()); - assertEquals(4, translog.allUploaded().size()); + assertEquals(4, translog.allUploadedSize()); assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); translog.setMinSeqNoToKeep(3); translog.trimUnreferencedReaders(); assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); - assertBusy(() -> assertEquals(2, translog.allUploaded().size())); + assertBusy(() -> assertEquals(2, translog.allUploadedSize())); assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java index a806eea381297..eadc6984d5fde 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java @@ -57,7 +57,8 @@ public void testUploadBlobs() throws Exception { FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( testFile, randomNonNegativeLong(), - 0L + 0L, + null ); AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); @@ -104,7 +105,8 @@ public void testUploadBlobsIOException() throws Exception { FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( testFile, randomNonNegativeLong(), - 0L + 0L, + null ); AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); @@ -143,7 +145,8 @@ public void testUploadBlobsUploadFutureCompletedExceptionally() throws Exception FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( testFile, randomNonNegativeLong(), - 0L + 0L, + null ); AsyncMultiStreamBlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index e4f5a454b15f6..3d97ba26bcdd3 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -53,6 +53,7 @@ public void testUploadBlob() throws IOException { FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( testFile, randomNonNegativeLong(), + null, null ); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); @@ -76,6 +77,7 @@ public void testUploadBlobAsync() throws IOException, InterruptedException { FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( testFile, randomNonNegativeLong(), + null, null ); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java index 2d75851e888a5..38a0a85bb5cb2 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java @@ -28,15 +28,15 @@ public void tearDown() throws Exception { public void testFileSnapshotPath() throws IOException { Path file = createTempFile(); Files.writeString(file, "hello"); - fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null); + fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null, null); assertFileSnapshotProperties(file); - try (FileSnapshot sameFileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null)) { + try (FileSnapshot sameFileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null, null)) { assertEquals(sameFileSnapshot, fileSnapshot); } - try (FileSnapshot sameFileDiffPTSnapshot = new FileSnapshot.TransferFileSnapshot(file, 34, null)) { + try (FileSnapshot sameFileDiffPTSnapshot = new FileSnapshot.TransferFileSnapshot(file, 34, null, null)) { assertNotEquals(sameFileDiffPTSnapshot, fileSnapshot); } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManagerTests.java index aa13f10081e87..b7ab4cb28221e 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferManagerTests.java @@ -224,7 +224,7 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { assertEquals(0, fileTransferFailed.get()); assertEquals(1, translogTransferSucceeded.get()); assertEquals(0, translogTransferFailed.get()); - assertEquals(2, fileTransferTracker.allUploadedGeneration().size()); + assertEquals(2, fileTransferTracker.allUploadedGenerationSize()); } public void testTransferSnapshotOnUploadTimeout() throws Exception { @@ -426,11 +426,11 @@ public void testDeleteTranslogSuccess_when_ckpStoredAsMetadata() throws Exceptio String translogFile = "translog-19.tlog"; tracker.addGeneration(19, true); // tracker.add(checkpointFile, true); - assertEquals(1, tracker.allUploadedGeneration().size()); + assertEquals(1, tracker.allUploadedGenerationSize()); List verifyDeleteFilesList = List.of(translogFile); translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {}); - assertBusy(() -> assertEquals(0, tracker.allUploadedGeneration().size())); + assertBusy(() -> assertEquals(0, tracker.allUploadedGenerationSize())); // only translog.tlog file will be sent for delete. verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(verifyDeleteFilesList)); } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferTrackerTests.java index a5c9df12b7054..3ae8dcebf31c6 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpAsMetadataFileTransferTrackerTests.java @@ -67,7 +67,7 @@ public void testOnSuccess_WhenCkpAsMetadata() throws IOException { // idempotent remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize); fileTransferTrackerCkpAsMetadata.onSuccess(transferFileSnapshot); - assertEquals(fileTransferTrackerCkpAsMetadata.allUploadedGeneration().size(), 1); + assertEquals(fileTransferTrackerCkpAsMetadata.allUploadedGenerationSize(), 1); try { remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize); fileTransferTrackerCkpAsMetadata.onFailure(transferFileSnapshot, new IOException("random exception")); @@ -126,11 +126,11 @@ public void testOnFailure_WhenCkpAsMetadata() throws IOException { ) ); fileTransferTrackerCkpAsMetadata.onSuccess(translogCheckpointSnapshot2); - assertEquals(fileTransferTrackerCkpAsMetadata.allUploadedGeneration().size(), 1); + assertEquals(fileTransferTrackerCkpAsMetadata.allUploadedGenerationSize(), 1); remoteTranslogTransferTracker.addUploadBytesStarted(fileSize * 2); fileTransferTrackerCkpAsMetadata.onSuccess(translogCheckpointSnapshot1); - assertEquals(fileTransferTrackerCkpAsMetadata.allUploadedGeneration().size(), 2); + assertEquals(fileTransferTrackerCkpAsMetadata.allUploadedGenerationSize(), 2); translogFileSnapshot1.close(); } @@ -163,7 +163,7 @@ public void testOnSuccessStatsFailure_WhenCkpAsMetadata() throws IOException { localFileTransferTracker.recordBytesForFiles(toUpload); localRemoteTranslogTransferTracker.addUploadBytesStarted(2 * fileSize); localFileTransferTracker.onSuccess(transferFileSnapshot); - assertEquals(localFileTransferTracker.allUploadedGeneration().size(), 1); + assertEquals(localFileTransferTracker.allUploadedGenerationSize(), 1); } public void testUploaded_WhenCkpAsMetadata() throws IOException { @@ -188,10 +188,10 @@ public void testUploaded_WhenCkpAsMetadata() throws IOException { fileTransferTrackerCkpAsMetadata.recordBytesForFiles(toUpload); remoteTranslogTransferTracker.addUploadBytesStarted(2 * fileSize); fileTransferTrackerCkpAsMetadata.onSuccess(transferFileSnapshot); - assertTrue(fileTransferTrackerCkpAsMetadata.isGenerationUploaded(generation)); - assertFalse(fileTransferTrackerCkpAsMetadata.isGenerationUploaded(generation + 2)); + assertTrue(fileTransferTrackerCkpAsMetadata.uploaded(generation)); + assertFalse(fileTransferTrackerCkpAsMetadata.uploaded(generation + 2)); fileTransferTrackerCkpAsMetadata.deleteGenerations(Set.of(generation)); - assertFalse(fileTransferTrackerCkpAsMetadata.isGenerationUploaded(generation)); + assertFalse(fileTransferTrackerCkpAsMetadata.uploaded(generation)); } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferManagerTests.java index 2dc68395b2858..9fb2cbb55b332 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferManagerTests.java @@ -193,7 +193,7 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { assertEquals(0, fileTransferFailed.get()); assertEquals(1, translogTransferSucceeded.get()); assertEquals(0, translogTransferFailed.get()); - assertEquals(2, fileTransferTracker.allUploadedGeneration().size()); + assertEquals(2, fileTransferTracker.allUploadedGenerationSize()); } public void testTransferSnapshotOnUploadTimeout() throws Exception { @@ -515,11 +515,11 @@ public void testDeleteTranslogSuccess() throws Exception { tracker.addGeneration(19, true); tracker.addFile(translogFile, true); tracker.addFile(checkpointFile, true); - assertEquals(1, tracker.allUploadedGeneration().size()); + assertEquals(1, tracker.allUploadedGenerationSize()); List files = List.of(checkpointFile, translogFile); translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {}); - assertBusy(() -> assertEquals(0, tracker.allUploadedGeneration().size())); + assertBusy(() -> assertEquals(0, tracker.allUploadedGenerationSize())); verify(blobContainer).deleteBlobsIgnoringIfNotExists(eq(files)); } @@ -585,10 +585,10 @@ public void testDeleteTranslogFailure() throws Exception { tracker.addFile(translogFile, true); tracker.addFile(checkpointFile, true); tracker.addGeneration(19, true); - assertEquals(1, tracker.allUploadedGeneration().size()); + assertEquals(1, tracker.allUploadedGenerationSize()); translogTransferManager.deleteGenerationAsync(primaryTerm, Set.of(19L), () -> {}); - assertEquals(1, tracker.allUploadedGeneration().size()); + assertEquals(1, tracker.allUploadedGenerationSize()); } private void assertNoDownloadStats(boolean nonZeroUploadTime) { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferTrackerTests.java index 2e1f9a512a313..f7a158f222deb 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogCkpFilesTransferTrackerTests.java @@ -68,7 +68,7 @@ public void testOnSuccess() throws IOException { // idempotent remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize); fileTransferTracker.onSuccess(transferFileSnapshot); - assertEquals(fileTransferTracker.allUploadedGeneration().size(), 1); + assertEquals(fileTransferTracker.allUploadedGenerationSize(), 1); try { remoteTranslogTransferTracker.addUploadBytesStarted(fileSize + ckpFileSize); fileTransferTracker.onFailure(transferFileSnapshot, new IOException("random exception")); @@ -130,11 +130,11 @@ public void testOnFailure() throws IOException { ) ); fileTransferTracker.onSuccess(translogCheckpointSnapshot2); - assertEquals(fileTransferTracker.allUploadedGeneration().size(), 1); + assertEquals(fileTransferTracker.allUploadedGenerationSize(), 1); remoteTranslogTransferTracker.addUploadBytesStarted(fileSize * 2); fileTransferTracker.onSuccess(translogCheckpointSnapshot1); - assertEquals(fileTransferTracker.allUploadedGeneration().size(), 2); + assertEquals(fileTransferTracker.allUploadedGenerationSize(), 2); checkpointFileSnapshot1.close(); translogFileSnapshot1.close(); @@ -172,7 +172,7 @@ public void testOnSuccessStatsFailure() throws IOException { localFileTransferTracker.recordBytesForFiles(toUpload); localRemoteTranslogTransferTracker.addUploadBytesStarted(2 * fileSize); localFileTransferTracker.onSuccess(transferFileSnapshot); - assertEquals(localFileTransferTracker.allUploadedGeneration().size(), 1); + assertEquals(localFileTransferTracker.allUploadedGenerationSize(), 1); } public void testUploaded() throws IOException { @@ -199,17 +199,17 @@ public void testUploaded() throws IOException { fileTransferTracker.onSuccess(transferFileSnapshot); String tlogFileName = String.valueOf(testFile.getFileName()); String ckpFileName = String.valueOf(ckpFile.getFileName()); - assertTrue(fileTransferTracker.isUploaded(tlogFileName)); - assertTrue(fileTransferTracker.isUploaded(ckpFileName)); - assertTrue(fileTransferTracker.isGenerationUploaded(generation)); - assertFalse(fileTransferTracker.isGenerationUploaded(generation + 2)); - assertFalse(fileTransferTracker.isUploaded("random-name")); + assertTrue(fileTransferTracker.uploaded(tlogFileName)); + assertTrue(fileTransferTracker.uploaded(ckpFileName)); + assertTrue(fileTransferTracker.uploaded(generation)); + assertFalse(fileTransferTracker.uploaded(generation + 2)); + assertFalse(fileTransferTracker.uploaded("random-name")); fileTransferTracker.deleteGenerations(Set.of(generation)); - assertFalse(fileTransferTracker.isGenerationUploaded(generation)); + assertFalse(fileTransferTracker.uploaded(generation)); fileTransferTracker.deleteGenerations(Set.of(generation)); - assertFalse(fileTransferTracker.isUploaded(Long.toString(generation))); + assertFalse(fileTransferTracker.uploaded(Long.toString(generation))); }