Skip to content

Commit

Permalink
Upload remote paths during index creation or full cluster upload (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#13150)

* Upload remote paths during index creation or full cluster upload

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored Apr 23, 2024
1 parent 97e3191 commit 3ea0a31
Show file tree
Hide file tree
Showing 17 changed files with 1,409 additions and 128 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteIndexPath;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.Locale;
import java.util.concurrent.ExecutionException;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreUploadIndexPathIT extends RemoteStoreBaseIntegTestCase {

private final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
}

/**
* Checks that the remote index path file gets created for the intended remote store path type and does not get created
* wherever not required.
*/
public void testRemoteIndexPathFileCreation() throws ExecutionException, InterruptedException, IOException {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);

// Case 1 - Hashed_prefix, we would need the remote index path file to be created.
client(clusterManagerNode).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX)
)
.get();

createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 1));
validateRemoteIndexPathFile(true);
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
FileSystemUtils.deleteSubDirectories(translogRepoPath);
FileSystemUtils.deleteSubDirectories(segmentRepoPath);

// Case 2 - Hashed_infix, we would not have the remote index path file created here.
client(clusterManagerNode).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_INFIX)
)
.get();
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 1));
validateRemoteIndexPathFile(false);
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());

// Case 3 - fixed, we would not have the remote index path file created here either.
client(clusterManagerNode).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED))
.get();
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 1));
validateRemoteIndexPathFile(false);
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());

}

private void validateRemoteIndexPathFile(boolean exists) {
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);

assertEquals(exists, FileSystemUtils.exists(translogRepoPath.resolve(RemoteIndexPath.DIR)));
assertEquals(
exists,
FileSystemUtils.exists(
translogRepoPath.resolve(RemoteIndexPath.DIR)
.resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID))
)
);
assertEquals(exists, FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR)));
assertEquals(
exists,
FileSystemUtils.exists(
segmentRepoPath.resolve(RemoteIndexPath.DIR)
.resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID))
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class RemoteTransferContainer implements Closeable {
private final String remoteFileName;
private final boolean failTransferIfFileExists;
private final WritePriority writePriority;
private final long expectedChecksum;
private final Long expectedChecksum;
private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
private final boolean isRemoteDataIntegritySupported;
private final AtomicBoolean readBlock = new AtomicBoolean();
Expand All @@ -79,7 +79,7 @@ public RemoteTransferContainer(
boolean failTransferIfFileExists,
WritePriority writePriority,
OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier,
long expectedChecksum,
Long expectedChecksum,
boolean isRemoteDataIntegritySupported
) {
this(
Expand Down Expand Up @@ -115,7 +115,7 @@ public RemoteTransferContainer(
boolean failTransferIfFileExists,
WritePriority writePriority,
OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier,
long expectedChecksum,
Long expectedChecksum,
boolean isRemoteDataIntegritySupported,
Map<String, String> metadata
) {
Expand Down Expand Up @@ -230,15 +230,15 @@ private LocalStreamSupplier<InputStreamContainer> getMultipartStreamSupplier(
}

private boolean isRemoteDataIntegrityCheckPossible() {
return isRemoteDataIntegritySupported;
return isRemoteDataIntegritySupported && Objects.nonNull(expectedChecksum);
}

private void finalizeUpload(boolean uploadSuccessful) throws IOException {
if (isRemoteDataIntegrityCheckPossible()) {
return;
}

if (uploadSuccessful) {
if (uploadSuccessful && Objects.nonNull(expectedChecksum)) {
long actualChecksum = getActualChecksum();
if (actualChecksum != expectedChecksum) {
throw new CorruptIndexException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;

/**
* Hook for running code that needs to be executed before the upload of index metadata. Here we have introduced a hook
* for index creation (also triggerred after enabling the remote cluster statement for the first time). The listener
* is intended to be run in parallel and async with the index metadata upload.
*
* @opensearch.internal
*/
public abstract class IndexMetadataUploadListener {

private final ExecutorService executorService;

public IndexMetadataUploadListener(ThreadPool threadPool, String threadPoolName) {
Objects.requireNonNull(threadPool);
Objects.requireNonNull(threadPoolName);
assert ThreadPool.THREAD_POOL_TYPES.containsKey(threadPoolName) && ThreadPool.Names.SAME.equals(threadPoolName) == false;
this.executorService = threadPool.executor(threadPoolName);
}

/**
* Runs before the new index upload of index metadata (or first time upload). The caller is expected to trigger
* onSuccess or onFailure of the {@code ActionListener}.
*
* @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload).
* @param actionListener listener to be invoked on success or failure.
*/
public final void onNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) {
executorService.execute(() -> doOnNewIndexUpload(indexMetadataList, actionListener));
}

protected abstract void doOnNewIndexUpload(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener);
}
Loading

0 comments on commit 3ea0a31

Please sign in to comment.