Skip to content

Commit

Permalink
Make changes for given comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed May 13, 2024
1 parent bd58d92 commit fd5450f
Show file tree
Hide file tree
Showing 33 changed files with 287 additions and 366 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.MapperService.MergeReason;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.remote.RemoteStoreCustomDataResolver;
import org.opensearch.index.remote.RemoteStoreCustomMetadataResolver;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
Expand Down Expand Up @@ -178,7 +178,7 @@ public class MetadataCreateIndexService {
private AwarenessReplicaBalance awarenessReplicaBalance;

@Nullable
private final RemoteStoreCustomDataResolver remoteStoreCustomDataResolver;
private final RemoteStoreCustomMetadataResolver remoteStoreCustomMetadataResolver;

public MetadataCreateIndexService(
final Settings settings,
Expand Down Expand Up @@ -213,8 +213,8 @@ public MetadataCreateIndexService(
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
remoteStoreCustomDataResolver = isRemoteDataAttributePresent(settings)
? new RemoteStoreCustomDataResolver(remoteStoreSettings, minNodeVersionSupplier)
remoteStoreCustomMetadataResolver = isRemoteDataAttributePresent(settings)
? new RemoteStoreCustomMetadataResolver(remoteStoreSettings, minNodeVersionSupplier)
: null;
}

Expand Down Expand Up @@ -563,7 +563,7 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
tmpImdBuilder.setRoutingNumShards(routingNumShards);
tmpImdBuilder.settings(indexSettings);
tmpImdBuilder.system(isSystem);
addRemoteStoreCustomData(tmpImdBuilder, true);
addRemoteStoreCustomMetadata(tmpImdBuilder, true);

// Set up everything, now locally create the index to see that things are ok, and apply
IndexMetadata tempMetadata = tmpImdBuilder.build();
Expand All @@ -573,27 +573,28 @@ IndexMetadata buildAndValidateTemporaryIndexMetadata(
}

/**
* Adds the remote store path type information in custom data of index metadata.
* Adds the 1) remote store path type 2) ckp as translog metadata information in custom data of index metadata.
*
* @param tmpImdBuilder index metadata builder.
* @param assertNullOldType flag to verify that the old remote store path type is null
*/
public void addRemoteStoreCustomData(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) {
if (remoteStoreCustomDataResolver == null) {
public void addRemoteStoreCustomMetadata(IndexMetadata.Builder tmpImdBuilder, boolean assertNullOldType) {
if (remoteStoreCustomMetadataResolver == null) {
return;
}
// It is possible that remote custom data exists already. In such cases, we need to only update the path type
// in the remote store custom data map.
Map<String, String> existingCustomData = tmpImdBuilder.removeCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert assertNullOldType == false || Objects.isNull(existingCustomData);

// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStoreCustomDataResolver.get();
Map<String, String> remoteCustomData = new HashMap<>();

boolean translocCkpAsMetadataUploadAllowed = remoteStoreCustomDataResolver.getRemoteStoreTranslogCkpAsMetadataAllowed();
remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(translocCkpAsMetadataUploadAllowed));
// Determine if the ckp would be stored as translog metadata
boolean isCkpAsTranslogMetadata = remoteStoreCustomMetadataResolver.isCkpAsTranslogMetadata();
remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(isCkpAsTranslogMetadata));

// Determine the path type for use using the remoteStorePathResolver.
RemoteStorePathStrategy newPathStrategy = remoteStoreCustomMetadataResolver.getPathStrategy();
remoteCustomData.put(PathType.NAME, newPathStrategy.getType().name());
if (Objects.nonNull(newPathStrategy.getHashAlgorithm())) {
remoteCustomData.put(PathHashAlgorithm.NAME, newPathStrategy.getHashAlgorithm().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public Metadata applyChanges(Metadata oldMetadata, RoutingTable newRoutingTable,
oldMetadata.settings(),
logger
);
migrationImdUpdater.maybeUpdateRemoteStoreCustomData(indexMetadataBuilder, index.getName());
migrationImdUpdater.maybeUpdateRemoteStoreCustomMetadata(indexMetadataBuilder, index.getName());
migrationImdUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, index.getName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA
)
)
);
Expand Down
8 changes: 4 additions & 4 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ public static IndexMergePolicy fromString(String text) {
private final boolean widenIndexSortType;
private final boolean assignedOnRemoteNode;
private final RemoteStorePathStrategy remoteStorePathStrategy;
private final boolean translogCkpAsMetadataUploadAllowed;
private final boolean ckpAsTranslogMetadata;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -990,7 +990,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata);

translogCkpAsMetadataUploadAllowed = RemoteStoreUtils.determineTranslogCkpUploadAsMetadataAllowed(indexMetadata);
ckpAsTranslogMetadata = RemoteStoreUtils.determineCkpAsTranslogMetadata(indexMetadata);

setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));
Expand Down Expand Up @@ -1915,7 +1915,7 @@ public RemoteStorePathStrategy getRemoteStorePathStrategy() {
return remoteStorePathStrategy;
}

public boolean getTranslogCkpAsMetadataUploadAllowed() {
return translogCkpAsMetadataUploadAllowed;
public boolean isCkpAsTranslogMetadata() {
return ckpAsTranslogMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStoreCustomDataDuringMigration;
import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStoreCustomMetadataDuringMigration;
import static org.opensearch.index.remote.RemoteStoreUtils.getRemoteStoreRepoName;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -127,20 +127,20 @@ private boolean needsRemoteIndexSettingsUpdate(
* @param indexMetadataBuilder Mutated {@link IndexMetadata.Builder} having the previous state updates
* @param index index name
*/
public void maybeUpdateRemoteStoreCustomData(IndexMetadata.Builder indexMetadataBuilder, String index) {
if (indexHasRemoteCustomData(indexMetadata) == false) {
public void maybeUpdateRemoteStoreCustomMetadata(IndexMetadata.Builder indexMetadataBuilder, String index) {
if (indexHasRemoteCustomMetadata(indexMetadata) == false) {
logger.info("Adding remote store custom data for index [{}] during migration", index);
indexMetadataBuilder.putCustom(
REMOTE_STORE_CUSTOM_KEY,
determineRemoteStoreCustomDataDuringMigration(clusterSettings, discoveryNodes)
determineRemoteStoreCustomMetadataDuringMigration(clusterSettings, discoveryNodes)
);
} else {
logger.debug("Index {} already has remote store custom data", index);
}
}

public static boolean indexHasAllRemoteStoreRelatedMetadata(IndexMetadata indexMetadata) {
return indexHasRemoteStoreSettings(indexMetadata.getSettings()) && indexHasRemoteCustomData(indexMetadata);
return indexHasRemoteStoreSettings(indexMetadata.getSettings()) && indexHasRemoteCustomMetadata(indexMetadata);
}

/**
Expand All @@ -167,7 +167,7 @@ public static boolean indexHasRemoteStoreSettings(Settings indexSettings) {
* @param indexMetadata Current index metadata
* @return <code>true</code> if all above conditions match. <code>false</code> otherwise
*/
public static boolean indexHasRemoteCustomData(IndexMetadata indexMetadata) {
public static boolean indexHasRemoteCustomMetadata(IndexMetadata indexMetadata) {
Map<String, String> customMetadata = indexMetadata.getCustomData(REMOTE_STORE_CUSTOM_KEY);
return Objects.nonNull(customMetadata)
&& Objects.nonNull(customMetadata.get(PathType.NAME))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@
* @opensearch.internal
*/
@ExperimentalApi
public class RemoteStoreCustomDataResolver {
public class RemoteStoreCustomMetadataResolver {

private final RemoteStoreSettings remoteStoreSettings;
private final Supplier<Version> minNodeVersionSupplier;

public RemoteStoreCustomDataResolver(RemoteStoreSettings remoteStoreSettings, Supplier<Version> minNodeVersionSupplier) {
public RemoteStoreCustomMetadataResolver(RemoteStoreSettings remoteStoreSettings, Supplier<Version> minNodeVersionSupplier) {
this.remoteStoreSettings = remoteStoreSettings;
this.minNodeVersionSupplier = minNodeVersionSupplier;
}

public RemoteStorePathStrategy get() {
public RemoteStorePathStrategy getPathStrategy() {
PathType pathType;
PathHashAlgorithm pathHashAlgorithm;
// Min node version check ensures that we are enabling the new prefix type only when all the nodes understand it.
Expand All @@ -42,8 +42,8 @@ public RemoteStorePathStrategy get() {
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
}

public boolean getRemoteStoreTranslogCkpAsMetadataAllowed() {
return Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 && remoteStoreSettings.getEnableTranslogCkpAsMetadataUpload();
public boolean isCkpAsTranslogMetadata() {
return Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 && remoteStoreSettings.isCkpAsTranslogMetadata();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA;

/**
* Utils for remote store
Expand Down Expand Up @@ -185,7 +185,7 @@ public static RemoteStorePathStrategy determineRemoteStorePathStrategy(IndexMeta
/**
* Determines whether translog ckp upload as metadata allowed or not
*/
public static boolean determineTranslogCkpUploadAsMetadataAllowed(IndexMetadata indexMetadata) {
public static boolean determineCkpAsTranslogMetadata(IndexMetadata indexMetadata) {
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
assert remoteCustomData == null || remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA);
if (remoteCustomData != null && remoteCustomData.containsKey(RemoteStoreEnums.CKP_AS_METADATA)) {
Expand All @@ -198,25 +198,27 @@ public static boolean determineTranslogCkpUploadAsMetadataAllowed(IndexMetadata
* Generates the remote store path type information to be added to custom data of index metadata during migration
*
* @param clusterSettings Current Cluster settings from {@link ClusterState}
* @param discoveryNodes Current {@link DiscoveryNodes} from the cluster state
* @param discoveryNodes Current {@link DiscoveryNodes} from the cluster state
* @return {@link Map} to be added as custom data in index metadata
*/
public static Map<String, String> determineRemoteStoreCustomDataDuringMigration(
public static Map<String, String> determineRemoteStoreCustomMetadataDuringMigration(
Settings clusterSettings,
DiscoveryNodes discoveryNodes
) {
Map<String, String> remoteCustomData = new HashMap<>();
Version minNodeVersion = discoveryNodes.getMinNodeVersion();

boolean ckpAsMetadata = Version.CURRENT.compareTo(minNodeVersion) <= 0
&& CLUSTER_REMOTE_STORE_TRANSLOG_CKP_AS_METADATA.get(clusterSettings);
remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(ckpAsMetadata));

RemoteStoreEnums.PathType pathType = Version.CURRENT.compareTo(minNodeVersion) <= 0
? CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.get(clusterSettings)
: RemoteStoreEnums.PathType.FIXED;
RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm = pathType == RemoteStoreEnums.PathType.FIXED
? null
: CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING.get(clusterSettings);
boolean ckpAsMetadata = Version.CURRENT.compareTo(minNodeVersion) <= 0
&& CLUSTER_REMOTE_STORE_TRANSLOG_CKP_UPLOAD_SETTING.get(clusterSettings);
Map<String, String> remoteCustomData = new HashMap<>();
remoteCustomData.put(RemoteStoreEnums.PathType.NAME, pathType.name());
remoteCustomData.put(RemoteStoreEnums.CKP_AS_METADATA, Boolean.toString(ckpAsMetadata));
if (Objects.nonNull(pathHashAlgorithm)) {
remoteCustomData.put(RemoteStoreEnums.PathHashAlgorithm.NAME, pathHashAlgorithm.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5002,7 +5002,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
remoteStoreSettings,
logger,
shouldSeedRemoteStore(),
indexSettings.getTranslogCkpAsMetadataUploadAllowed()
indexSettings.isCkpAsTranslogMetadata()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,7 @@ public RemoteFsTranslog(
this.startedPrimarySupplier = startedPrimarySupplier;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;

boolean ckpAsTranslogMetadata = isCkpAsTranslogMetadata(
indexSettings().getTranslogCkpAsMetadataUploadAllowed(),
blobStoreRepository
);
boolean ckpAsTranslogMetadata = isCkpAsTranslogMetadata(indexSettings().isCkpAsTranslogMetadata(), blobStoreRepository);
fileTransferTracker = FileTransferTrackerFactory.getFileTransferTracker(
shardId,
remoteTranslogTransferTracker,
Expand Down Expand Up @@ -174,8 +171,8 @@ RemoteTranslogTransferTracker getRemoteTranslogTracker() {
return remoteTranslogTransferTracker;
}

private static boolean isCkpAsTranslogMetadata(boolean isCkpAsMetadataUploadAllowed, BlobStoreRepository blobStoreRepository) {
return blobStoreRepository.blobStore().isBlobMetadataSupported() && isCkpAsMetadataUploadAllowed;
private static boolean isCkpAsTranslogMetadata(boolean ckpAsTranslogMetadata, BlobStoreRepository blobStoreRepository) {
return blobStoreRepository.blobStore().isBlobMetadataSupported() && ckpAsTranslogMetadata;
}

public static void download(
Expand All @@ -187,7 +184,7 @@ public static void download(
RemoteStoreSettings remoteStoreSettings,
Logger logger,
boolean seedRemote,
boolean ckpAsMetadataUploadAllowed
boolean ckpAsTranslogMetadata
) throws IOException {
assert repository instanceof BlobStoreRepository : String.format(
Locale.ROOT,
Expand All @@ -196,7 +193,7 @@ public static void download(
);
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;

boolean ckpAsTranslogMetadata = isCkpAsTranslogMetadata(ckpAsMetadataUploadAllowed, blobStoreRepository);
ckpAsTranslogMetadata = isCkpAsTranslogMetadata(ckpAsTranslogMetadata, blobStoreRepository);
// We use a dummy stats tracker to ensure the flow doesn't break.
// TODO: To be revisited as part of https://github.com/opensearch-project/OpenSearch/issues/7567
RemoteTranslogTransferTracker remoteTranslogTransferTracker = new RemoteTranslogTransferTracker(shardId, 1000);
Expand Down Expand Up @@ -454,8 +451,8 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
}

// Visible for testing
public Set<String> allUploaded() {
return fileTransferTracker.allUploadedGeneration();
int allUploadedSize() {
return fileTransferTracker.allUploadedGenerationSize();
}

private boolean syncToDisk() throws IOException {
Expand Down Expand Up @@ -575,7 +572,7 @@ public void trimUnreferencedReaders() throws IOException {
// This enables us to restore translog from the metadata in case of failover or relocation.
Set<Long> generationsToDelete = new HashSet<>();
for (long generation = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); generation >= 0; generation--) {
if (fileTransferTracker.isGenerationUploaded(generation) == false) {
if (fileTransferTracker.uploaded(generation) == false) {
break;
}
generationsToDelete.add(generation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,8 @@ public InputStream downloadBlob(Iterable<String> path, String fileName) throws I
@ExperimentalApi
public InputStreamWithMetadata downloadBlobWithMetadata(Iterable<String> path, String fileName) throws IOException {
// If the blob store supports metadata, retrieve the blob with metadata; otherwise, retrieve the blob without metadata.
if (blobStore.isBlobMetadataSupported()) {
return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName);
} else {
InputStream inputStream = blobStore.blobContainer((BlobPath) path).readBlob(fileName);
return new InputStreamWithMetadata(inputStream, null);
}
assert blobStore.isBlobMetadataSupported();
return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName);
}

@Override
Expand Down
Loading

0 comments on commit fd5450f

Please sign in to comment.