Skip to content

Commit

Permalink
Incoporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Apr 15, 2024
1 parent c4b5a9d commit 67906bd
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 205 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.cluster.metadata.IndexMetadata;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
* Hook for running code that needs to be executed before the upload of index metadata during index creation or
* after enabling the remote cluster statement for the first time. The listener is intended to be run in parallel and
* async with the index metadata upload.
*
* @opensearch.internal
*/
public interface IndexCreationPreIndexMetadataUploadListener {

/**
* This returns the additional count that needs to be added in the latch present in {@link RemoteClusterStateService}
* which is used to achieve parallelism and async nature of uploads for index metadata upload. The same latch is used
* for running pre index metadata upload listener.
*
* @param newIndexMetadataList list of index metadata of new indexes (or first time index metadata upload).
* @return latch count to be added by the caller.
*/
int latchCount(List<IndexMetadata> newIndexMetadataList);

/**
* This will run the pre index metadata upload listener using the {@code newIndexMetadataList}, {@code latch} and
* {@code exceptionList}. This method must execute the operation in parallel and async to ensure that the cluster state
* upload time remains the same.
*
* @param newIndexMetadataList list of index metadata of new indexes (or first time index metadata upload).
* @param latch this is used for counting down once the unit of work per index is done.
* @param exceptionList exception if any during run will be added here and used by the caller.
*/
void run(List<IndexMetadata> newIndexMetadataList, CountDownLatch latch, List<Exception> exceptionList) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,11 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.index.remote.RemoteIndexPath;
import org.opensearch.index.remote.RemoteStoreEnums.DataCategory;
import org.opensearch.index.remote.RemoteStoreEnums.DataType;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.node.Node;
Expand Down Expand Up @@ -70,9 +65,6 @@
import java.util.stream.Collectors;

import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH;
import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -175,14 +167,10 @@ public class RemoteClusterStateService implements Closeable {
private final Settings settings;
private final LongSupplier relativeTimeNanosSupplier;
private final ThreadPool threadpool;
private final IndexCreationPreIndexMetadataUploadListener indexCreationListener;
private BlobStoreRepository blobStoreRepository;
private BlobStoreTransferService blobStoreTransferService;
private volatile TimeValue slowWriteLoggingThreshold;
private BlobStoreRepository translogRepository;
private BlobStoreTransferService translogTransferService;
private BlobStoreRepository segmentRepository;
private BlobStoreTransferService segmentsTransferService;
private final boolean isRemoteDataAttributePresent;

private volatile TimeValue indexMetadataUploadTimeout;
private volatile TimeValue globalMetadataUploadTimeout;
Expand Down Expand Up @@ -210,6 +198,18 @@ public RemoteClusterStateService(
ClusterSettings clusterSettings,
LongSupplier relativeTimeNanosSupplier,
ThreadPool threadPool
) {
this(nodeId, repositoriesService, settings, clusterSettings, relativeTimeNanosSupplier, threadPool, null);
}

public RemoteClusterStateService(
String nodeId,
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings,
LongSupplier relativeTimeNanosSupplier,
ThreadPool threadPool,
IndexCreationPreIndexMetadataUploadListener indexCreationListener
) {
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
this.nodeId = nodeId;
Expand All @@ -226,7 +226,7 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
this.isRemoteDataAttributePresent = isRemoteDataAttributePresent(settings);
this.indexCreationListener = indexCreationListener;
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand All @@ -236,20 +236,6 @@ private BlobStoreTransferService getBlobStoreTransferService() {
return blobStoreTransferService;
}

private BlobStoreTransferService getTranslogTransferService() {
if (translogTransferService == null) {
translogTransferService = new BlobStoreTransferService(translogRepository.blobStore(), threadpool);
}
return translogTransferService;
}

private BlobStoreTransferService getSegmentsTransferService() {
if (segmentsTransferService == null) {
segmentsTransferService = new BlobStoreTransferService(segmentRepository.blobStore(), threadpool);
}
return segmentsTransferService;
}

/**
* This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be
* invoked by the elected cluster manager when the remote cluster state is enabled.
Expand Down Expand Up @@ -482,15 +468,15 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException
private List<UploadedIndexMetadata> writeIndexMetadataParallel(
ClusterState clusterState,
List<IndexMetadata> toUpload,
List<IndexMetadata> toUploadIndexPath
List<IndexMetadata> newIndexMetadataList
) throws IOException {
boolean isTranslogSegmentRepoSame = isTranslogSegmentRepoSame();
int latchCount = toUpload.size() + (isTranslogSegmentRepoSame ? toUploadIndexPath.size() : 2 * toUploadIndexPath.size());
assert Objects.nonNull(indexCreationListener) : "indexCreationListener can not be null";
int latchCount = toUpload.size() + indexCreationListener.latchCount(newIndexMetadataList);
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount));
final CountDownLatch latch = new CountDownLatch(latchCount);
List<UploadedIndexMetadata> result = new ArrayList<>(toUpload.size());
uploadIndexMetadataAsync(clusterState, result, toUpload, latch, exceptionList);
uploadIndexPathAsync(toUploadIndexPath, latch, isTranslogSegmentRepoSame, exceptionList);
indexCreationListener.run(newIndexMetadataList, latch, exceptionList);

try {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
Expand Down Expand Up @@ -531,102 +517,6 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
return result;
}

private void uploadIndexPathAsync(
List<IndexMetadata> toUploadIndexPath,
CountDownLatch latch,
boolean isTranslogSegmentRepoSame,
List<Exception> exceptionList
) throws IOException {
for (IndexMetadata indexMetadata : toUploadIndexPath) {
writeIndexPathAsync(indexMetadata, latch, isTranslogSegmentRepoSame, exceptionList);
}
}

private void writeIndexPathAsync(
IndexMetadata idxMD,
CountDownLatch latch,
boolean isTranslogSegmentRepoSame,
List<Exception> exceptionList
) throws IOException {
Map<String, String> remoteCustomData = idxMD.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
PathType pathType = PathType.valueOf(remoteCustomData.get(PathType.NAME));
PathHashAlgorithm hashAlgorithm = PathHashAlgorithm.valueOf(remoteCustomData.get(PathHashAlgorithm.NAME));
String indexUUID = idxMD.getIndexUUID();
int shardCount = idxMD.getNumberOfShards();
BlobPath translogBasePath = translogRepository.basePath();
BlobContainer translogBlobContainer = translogRepository.blobStore().blobContainer(translogBasePath.add(RemoteIndexPath.DIR));

if (isTranslogSegmentRepoSame) {
// If the repositories are same, then we need to upload a single file containing paths for both translog and segments.
Map<DataCategory, List<DataType>> pathCreationMap = new HashMap<>();
pathCreationMap.putAll(TRANSLOG_PATH);
pathCreationMap.putAll(SEGMENT_PATH);
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(
new RemoteIndexPath(indexUUID, shardCount, translogBasePath, pathType, hashAlgorithm, pathCreationMap),
translogBlobContainer,
indexUUID,
translogRepository.getCompressor(),
getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap),
FORMAT_PARAMS,
true,
XContentType.JSON
);
} else {
// If the repositories are different, then we need to upload one file per segment and translog containing their individual
// paths.
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(
new RemoteIndexPath(indexUUID, shardCount, translogBasePath, pathType, hashAlgorithm, TRANSLOG_PATH),
translogBlobContainer,
indexUUID,
translogRepository.getCompressor(),
getUploadPathLatchedActionListener(idxMD, latch, exceptionList, TRANSLOG_PATH),
FORMAT_PARAMS,
true,
XContentType.JSON
);

BlobPath segmentBasePath = segmentRepository.basePath();
BlobContainer segmentBlobContainer = segmentRepository.blobStore().blobContainer(segmentBasePath.add(RemoteIndexPath.DIR));
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(
new RemoteIndexPath(indexUUID, shardCount, segmentBasePath, pathType, hashAlgorithm, SEGMENT_PATH),
segmentBlobContainer,
indexUUID,
segmentRepository.getCompressor(),
getUploadPathLatchedActionListener(idxMD, latch, exceptionList, SEGMENT_PATH),
FORMAT_PARAMS,
true,
XContentType.JSON
);
}
}

private LatchedActionListener<Void> getUploadPathLatchedActionListener(
IndexMetadata indexMetadata,
CountDownLatch latch,
List<Exception> exceptionList,
Map<DataCategory, List<DataType>> pathCreationMap
) {
return new LatchedActionListener<>(
ActionListener.wrap(
resp -> logger.trace(
new ParameterizedMessage("Index path uploaded for {} indexMetadata={}", pathCreationMap, indexMetadata)
),
ex -> {
logger.error(
new ParameterizedMessage(
"Exception during Index path upload for {} indexMetadata={}",
pathCreationMap,
indexMetadata
),
ex
);
exceptionList.add(ex);
}
),
latch
);
}

private void uploadIndexMetadataAsync(
ClusterState clusterState,
List<UploadedIndexMetadata> result,
Expand Down Expand Up @@ -657,62 +547,28 @@ private void uploadIndexMetadataAsync(
}
}

private boolean isTranslogSegmentRepoSame() {
String translogRepoName = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY
);
String segmentRepoName = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY
);
return Objects.equals(translogRepoName, segmentRepoName);
}

private List<UploadedIndexMetadata> writeIndexMetadataParallel(
ClusterState clusterState,
List<IndexMetadata> toUpload,
String previousClusterUUID
) throws IOException {
List<IndexMetadata> toUploadIndexPath = Collections.emptyList();
List<IndexMetadata> newIndexMetadaList = Collections.emptyList();
if (ClusterState.UNKNOWN_UUID.equals(previousClusterUUID)) {
toUploadIndexPath = toUpload;
newIndexMetadaList = toUpload;
}
return writeIndexMetadataParallel(clusterState, toUpload, toUploadIndexPath);
return writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadaList);
}

private List<UploadedIndexMetadata> writeIndexMetadataParallel(
ClusterState clusterState,
List<IndexMetadata> toUpload,
Map<String, Long> indexNamePreviousVersionMap
) throws IOException {
List<IndexMetadata> toUploadIndexPath = Collections.emptyList();
if (isRemoteDataAttributePresent) {
toUploadIndexPath = toUpload.stream()
/* If the previous state's index metadata version is null, then this is index creation */
.filter(indexMetadata -> Objects.isNull(indexNamePreviousVersionMap.get(indexMetadata.getIndex().getName())))
/* Checks the condition if the Index path needs to be uploaded or not */
.filter(this::uploadIndexPathFile)
.collect(Collectors.toList());
}
return writeIndexMetadataParallel(clusterState, toUpload, toUploadIndexPath);
}

/**
* This method checks if the index metadata has attributes that calls for uploading the index path for remote store
* uploads. It checks if the remote store path type is {@code HASHED_PREFIX} and returns true if so.
*/
private boolean uploadIndexPathFile(IndexMetadata indexMetadata) {
assert isRemoteDataAttributePresent : "Remote data attributes is expected to be present";
// A cluster will have remote custom metadata only if the cluster is remote store enabled from data side.
Map<String, String> remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY);
if (Objects.isNull(remoteCustomData) || remoteCustomData.isEmpty()) {
return false;
}
String pathTypeStr = remoteCustomData.get(PathType.NAME);
if (Objects.isNull(pathTypeStr)) {
return false;
}
// We need to upload the path only if the path type for an index is hashed_prefix
return PathType.HASHED_PREFIX == PathType.parseString(pathTypeStr);
List<IndexMetadata> newIndexMetadataList = toUpload.stream()
/* If the previous state's index metadata version is null, then this is index creation */
.filter(indexMetadata -> Objects.isNull(indexNamePreviousVersionMap.get(indexMetadata.getIndex().getName())))
.collect(Collectors.toList());
return writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadataList);
}

/**
Expand Down Expand Up @@ -783,32 +639,13 @@ public void close() throws IOException {

public void start() {
assert isRemoteStoreClusterStateEnabled(settings) == true : "Remote cluster state is not enabled";
blobStoreRepository = (BlobStoreRepository) validateAndGetRepository(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"Cluster State"
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);

if (isRemoteDataAttributePresent == false) {
// If remote store data attributes are not present than we skip this.
return;
}
translogRepository = (BlobStoreRepository) validateAndGetRepository(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY,
"Translog"
);
segmentRepository = (BlobStoreRepository) validateAndGetRepository(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY,
"Translog"
);

}

private Repository validateAndGetRepository(String repoSetting, String repoName) {
final String repo = settings.get(repoSetting);
assert repo != null : "Remote " + repoName + " repository is not configured";
final Repository repository = repositoriesService.get().repository(repo);
assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
return repository;
blobStoreRepository = (BlobStoreRepository) repository;
}

private ClusterMetadataManifest uploadManifest(
Expand Down Expand Up @@ -887,16 +724,6 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust
);
}

private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) {
final Optional<ClusterMetadataManifest> latestManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!latestManifest.isPresent()) {
final String previousClusterUUID = getLastKnownUUIDFromRemote(clusterName);
assert !clusterUUID.equals(previousClusterUUID) : "Last cluster UUID is same current cluster UUID";
return previousClusterUUID;
}
return latestManifest.get().getPreviousClusterUUID();
}

private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
return blobStoreRepository.blobStore()
Expand Down
Loading

0 comments on commit 67906bd

Please sign in to comment.