Skip to content

Commit

Permalink
add support of new translog upload flow for new indices only
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed May 10, 2024
1 parent e058a92 commit 530277c
Show file tree
Hide file tree
Showing 26 changed files with 458 additions and 336 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.MapperService.MergeReason;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.remote.RemoteStoreCustomDataResolver;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStorePathStrategyResolver;
import org.opensearch.index.shard.IndexSettingProvider;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndexCreationException;
Expand Down Expand Up @@ -177,7 +178,7 @@ public class MetadataCreateIndexService {
private AwarenessReplicaBalance awarenessReplicaBalance;

@Nullable
private final RemoteStorePathStrategyResolver remoteStorePathStrategyResolver;
private final RemoteStoreCustomDataResolver remoteStoreCustomDataResolver;

public MetadataCreateIndexService(
final Settings settings,
Expand Down Expand Up @@ -212,8 +213,8 @@ public MetadataCreateIndexService(
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
remoteStorePathStrategyResolver = isRemoteDataAttributePresent(settings)
? new RemoteStorePathStrategyResolver(remoteStoreSettings, minNodeVersionSupplier)
remoteStoreCustomDataResolver = isRemoteDataAttributePresent(settings)
? new RemoteStoreCustomDataResolver(remoteStoreSettings, minNodeVersionSupplier)
: null;
}

Expand Down Expand Up @@ -562,7 +563,7 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
tmpImdBuilder.setRoutingNumShards(routingNumShards);
tmpImdBuilder.settings(indexSettings);
tmpImdBuilder.system(isSystem);
addRemoteStorePathStrategyInCustomData(tmpImdBuilder, true);
addRemoteStoreCustomData(tmpImdBuilder, true);

// Set up everything, now locally create the index to see that things are ok, and apply
IndexMetadata tempMetadata = tmpImdBuilder.build();
Expand All @@ -577,8 +578,8 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
* @param tmpImdBuilder index metadata builder.
* @param assertNullOldType flag to verify that the old remote store path type is null
*/
public void addRemoteStorePathStrategyInCustomData(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) {
if (remoteStorePathStrategyResolver == null) {
public void addRemoteStoreCustomData(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) {
if (remoteStoreCustomDataResolver == null) {
return;
}
// It is possible that remote custom data exists already. In such cases, we need to only update the path type
Expand All @@ -587,8 +588,12 @@ public void addRemoteStorePathStrategyInCustomData(IndexMetadata.Builder tmpImdB
assert assertNullOldType == false || Objects.isNull(existingCustomData);

// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStorePathStrategyResolver.get();
RemoteStorePathStrategy newPathStrategy = remoteStoreCustomDataResolver.get();
Map<String, String> remoteCustomData = new HashMap<>();

boolean translocCkpAsMetadataUploadAllowed = remoteStoreCustomDataResolver.getRemoteStoreTranslogCkpAsMetadataAllowed();
remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(translocCkpAsMetadataUploadAllowed));

remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name());
if (Objects.nonNull(newPathStrategy.getHashAlgorithm())) {
remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING
)
)
);
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ public static IndexMergePolicy fromString(String text) {
private final boolean widenIndexSortType;
private final boolean assignedOnRemoteNode;
private final RemoteStorePathStrategy remoteStorePathStrategy;
private final boolean translogCkpAsMetadataUploadAllowed;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -989,6 +990,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata);

translogCkpAsMetadataUploadAllowed = RemoteStoreUtils.determineTranslogCkpUploadAsMetadataAllowed(indexMetadata);

setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));

Expand Down Expand Up @@ -1911,4 +1914,8 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo
public RemoteStorePathStrategy getRemoteStorePathStrategy() {
return remoteStorePathStrategy;
}

public boolean getTranslogCkpAsMetadataUploadAllowed() {
return translogCkpAsMetadataUploadAllowed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
* @opensearch.internal
*/
@ExperimentalApi
public class RemoteStorePathStrategyResolver {
public class RemoteStoreCustomDataResolver {

private final RemoteStoreSettings remoteStoreSettings;
private final Supplier<Version> minNodeVersionSupplier;

public RemoteStorePathStrategyResolver(RemoteStoreSettings remoteStoreSettings, Supplier<Version> minNodeVersionSupplier) {
public RemoteStoreCustomDataResolver(RemoteStoreSettings remoteStoreSettings, Supplier<Version> minNodeVersionSupplier) {
this.remoteStoreSettings = remoteStoreSettings;
this.minNodeVersionSupplier = minNodeVersionSupplier;
}
Expand All @@ -41,4 +41,9 @@ public RemoteStorePathStrategy get() {
pathHashAlgorithm = pathType == PathType.FIXED ? null : remoteStoreSettings.getPathHashAlgorithm();
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
}

public boolean getRemoteStoreTranslogCkpAsMetadataAllowed() {
return Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 && remoteStoreSettings.getEnableTranslogCkpAsMetadataUpload();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
@ExperimentalApi
public class RemoteStoreEnums {

public static final String CKP_AS_METADATA = "ckp-as-metadata";

/**
* Categories of the data in Remote store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ public static RemoteStorePathStrategy determineRemoteStorePathStrategy(IndexMeta
return new RemoteStorePathStrategy(RemoteStoreEnums.PathType.FIXED);
}

/**
* Determines whether translog ckp upload as metadata allowed or not
*/
public static boolean determineTranslogCkpUploadAsMetadataAllowed(IndexMetadata indexMetadata) {
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert remoteCustomData == null || remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA);
if (remoteCustomData != null && remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA)) {
return Boolean.parseBoolean(remoteCustomData.get(RemoteStoreEnums.CKP_AS_METADATA));
}
return false;
}

/**
* Generates the remote store path type information to be added to custom data of index metadata during migration
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5001,7 +5001,8 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
indexSettings.getRemoteStorePathStrategy(),
remoteStoreSettings,
logger,
shouldSeedRemoteStore()
shouldSeedRemoteStore(),
indexSettings.getTranslogCkpAsMetadataUploadAllowed()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.index.translog;

import org.opensearch.Version;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -37,15 +36,13 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory {
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;

private final RemoteStoreSettings remoteStoreSettings;
private final Supplier<Version> minNodeVersionSupplier;

public RemoteBlobStoreInternalTranslogFactory(
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
String repositoryName,
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings,
Supplier<Version> minNodeVersionSupplier
RemoteStoreSettings remoteStoreSettings
) {
Repository repository;
try {
Expand All @@ -57,7 +54,6 @@ public RemoteBlobStoreInternalTranslogFactory(
this.threadPool = threadPool;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.remoteStoreSettings = remoteStoreSettings;
this.minNodeVersionSupplier = minNodeVersionSupplier;
}

@Override
Expand All @@ -84,8 +80,7 @@ public Translog newTranslog(
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker,
remoteStoreSettings,
minNodeVersionSupplier
remoteStoreSettings
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.index.translog;

import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -54,7 +53,6 @@
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
Expand Down Expand Up @@ -107,14 +105,17 @@ public RemoteFsTranslog(
ThreadPool threadPool,
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings,
Supplier<Version> minNodeVersionSupplier
RemoteStoreSettings remoteStoreSettings
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
this.startedPrimarySupplier = startedPrimarySupplier;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
boolean ckpAsTranslogMetadata = isCkpAsTranslogMetadata(minNodeVersionSupplier, blobStoreRepository);

boolean ckpAsTranslogMetadata = isCkpAsTranslogMetadata(
indexSettings().getTranslogCkpAsMetadataUploadAllowed(),
blobStoreRepository
);
fileTransferTracker = FileTransferTrackerFactory.getFileTransferTracker(
shardId,
remoteTranslogTransferTracker,
Expand Down Expand Up @@ -173,8 +174,8 @@ RemoteTranslogTransferTracker getRemoteTranslogTracker() {
return remoteTranslogTransferTracker;
}

private boolean isCkpAsTranslogMetadata(Supplier<Version> minNodeVersionSupplier, BlobStoreRepository blobStoreRepository) {
return blobStoreRepository.blobStore().isBlobMetadataSupported() && Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0;
private static boolean isCkpAsTranslogMetadata(boolean isCkpAsMetadataUploadAllowed, BlobStoreRepository blobStoreRepository) {
return blobStoreRepository.blobStore().isBlobMetadataSupported() && isCkpAsMetadataUploadAllowed;
}

public static void download(
Expand All @@ -185,21 +186,24 @@ public static void download(
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings,
Logger logger,
boolean seedRemote
boolean seedRemote,
boolean ckpAsMetadataUploadAllowed
) throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Locale.ROOT,
"%s repository should be instance of BlobStoreRepository",
shardId
);
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;

boolean ckpAsTranslogMetadata = isCkpAsTranslogMetadata(ckpAsMetadataUploadAllowed, blobStoreRepository);
// We use a dummy stats tracker to ensure the flow doesn't break.
// TODO: To be revisited as part of https://github.com/opensearch-project/OpenSearch/issues/7567
RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000);
FileTransferTracker fileTransferTracker = FileTransferTrackerFactory.getFileTransferTracker(
shardId,
remoteTranslogTransferTracker,
false
ckpAsTranslogMetadata
);
TranslogTransferManager translogTransferManager = buildTranslogTransferManager(
blobStoreRepository,
Expand All @@ -209,7 +213,7 @@ public static void download(
remoteTranslogTransferTracker,
pathStrategy,
remoteStoreSettings,
false
ckpAsTranslogMetadata
);
RemoteFsTranslog.download(translogTransferManager, location, logger, seedRemote);
logger.trace(remoteTranslogTransferTracker.toString());
Expand Down Expand Up @@ -451,7 +455,7 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws

// Visible for testing
public Set<String> allUploaded() {
return fileTransferTracker.allUploadedFiles();
return fileTransferTracker.allUploaded();
}

private boolean syncToDisk() throws IOException {
Expand Down
Loading

0 comments on commit 530277c

Please sign in to comment.