diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 0914506e632dd..26d20199def6c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -59,13 +59,17 @@ public Settings indexSettings() { .build(); } + protected void putRepository(Path path) { + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", path)) + ); + } + @Before public void setup() { internalCluster().startClusterManagerOnlyNode(); Path absolutePath = randomRepoPath().toAbsolutePath(); - assertAcked( - clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) - ); + putRepository(absolutePath); } @After diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index f069950c11f17..f4feaea1a02f8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -52,7 +52,7 @@ public Settings indexSettings() { return remoteStoreIndexSettings(0); } - private Settings remoteStoreIndexSettings(int numberOfReplicas) { + protected Settings remoteStoreIndexSettings(int numberOfReplicas) { return Settings.builder() .put(super.indexSettings()) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartFileCorruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartFileCorruptionIT.java new file mode 100644 index 0000000000000..8f375ca6e2b01 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartFileCorruptionIT.java @@ -0,0 +1,111 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.multipart.mocks.MockFsRepository; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.Collection; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public class RemoteStoreMultipartFileCorruptionIT extends OpenSearchIntegTestCase { + + protected static final String REPOSITORY_NAME = "test-remore-store-repo"; + private static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Collection> nodePlugins() { + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList()); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + Path absolutePath = randomRepoPath().toAbsolutePath(); + putRepository(absolutePath); + } + + protected void putRepository(Path path) { + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME) + .setType(MockFsRepositoryPlugin.TYPE) + .setSettings( + Settings.builder() + .put("location", path) + // custom setting for MockFsRepositoryPlugin + .put(MockFsRepository.TRIGGER_DATA_INTEGRITY_FAILURE.getKey(), true) + ) + ); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + protected Settings remoteStoreIndexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put("index.refresh_interval", "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + + private IndexResponse indexSingleDoc() { + return client().prepareIndex(INDEX_NAME) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + + public void testLocalFileCorruptionDuringUpload() { + internalCluster().startDataOnlyNodes(1); + createIndex(INDEX_NAME, remoteStoreIndexSettings()); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + indexSingleDoc(); + + client().admin() + .indices() + .prepareRefresh(INDEX_NAME) + .setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED) + .execute() + .actionGet(); + + // ensuring red cluster meaning shard has failed and is unassigned + ensureRed(INDEX_NAME); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java new file mode 100644 index 0000000000000..a523d5c0f5470 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart; + +import org.opensearch.common.settings.Settings; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.RemoteStoreIT; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; + +import java.nio.file.Path; +import java.util.Collection; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public class RemoteStoreMultipartIT extends RemoteStoreIT { + + @Override + protected Collection> nodePlugins() { + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList()); + } + + @Override + protected void putRepository(Path path) { + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME) + .setType(MockFsRepositoryPlugin.TYPE) + .setSettings(Settings.builder().put("location", path)) + ); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobContainer.java new file mode 100644 index 0000000000000..a56f0a338d296 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobContainer.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.apache.lucene.index.CorruptIndexException; +import org.opensearch.common.io.InputStreamContainer; +import org.opensearch.common.StreamContext; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.fs.FsBlobContainer; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.blobstore.stream.write.WriteContext; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class MockFsBlobContainer extends FsBlobContainer { + + private static final int TRANSFER_TIMEOUT_MILLIS = 30000; + + private final boolean triggerDataIntegrityFailure; + + public MockFsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, boolean triggerDataIntegrityFailure) { + super(blobStore, blobPath, path); + this.triggerDataIntegrityFailure = triggerDataIntegrityFailure; + } + + @Override + public boolean isMultiStreamUploadSupported() { + return true; + } + + @Override + public CompletableFuture writeBlobByStreams(WriteContext writeContext) throws IOException { + CompletableFuture completableFuture = new CompletableFuture<>(); + + int nParts = 10; + long partSize = writeContext.getFileSize() / nParts; + StreamContext streamContext = writeContext.getStreamProvider(partSize); + final Path file = path.resolve(writeContext.getFileName()); + byte[] buffer = new byte[(int) writeContext.getFileSize()]; + AtomicLong totalContentRead = new AtomicLong(); + CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts()); + for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { + int finalPartIdx = partIdx; + Thread thread = new Thread(() -> { + try { + InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx); + InputStream inputStream = inputStreamContainer.getInputStream(); + long remainingContentLength = inputStreamContainer.getContentLength(); + long offset = partSize * finalPartIdx; + while (remainingContentLength > 0) { + int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength); + totalContentRead.addAndGet(readContentLength); + remainingContentLength -= readContentLength; + offset += readContentLength; + } + inputStream.close(); + } catch (IOException e) { + completableFuture.completeExceptionally(e); + } finally { + latch.countDown(); + } + }); + thread.start(); + } + try { + if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName()); + } + } catch (InterruptedException e) { + throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName()); + } + try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { + outputStream.write(buffer); + } + if (writeContext.getFileSize() != totalContentRead.get()) { + throw new IOException( + "Incorrect content length read for file " + + writeContext.getFileName() + + ", actual file size: " + + writeContext.getFileSize() + + ", bytes read: " + + totalContentRead.get() + ); + } + + try { + // bulks need to succeed for segment files to be generated + if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) { + completableFuture.completeExceptionally( + new RuntimeException( + new CorruptIndexException( + "Data integrity check failure for file: " + writeContext.getFileName(), + writeContext.getFileName() + ) + ) + ); + } else { + writeContext.getUploadFinalizer().accept(true); + completableFuture.complete(null); + } + } catch (Exception e) { + completableFuture.completeExceptionally(e); + } + + return completableFuture; + } + + private boolean isSegmentFile(String filename) { + return !filename.endsWith(".tlog") && !filename.endsWith(".ckp"); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java new file mode 100644 index 0000000000000..435be3f8e66f2 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.fs.FsBlobStore; + +import java.io.IOException; +import java.nio.file.Path; + +public class MockFsBlobStore extends FsBlobStore { + + private final boolean triggerDataIntegrityFailure; + + public MockFsBlobStore(int bufferSizeInBytes, Path path, boolean readonly, boolean triggerDataIntegrityFailure) throws IOException { + super(bufferSizeInBytes, path, readonly); + this.triggerDataIntegrityFailure = triggerDataIntegrityFailure; + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + try { + return new MockFsBlobContainer(this, path, buildAndCreate(path), triggerDataIntegrityFailure); + } catch (IOException ex) { + throw new OpenSearchException("failed to create blob container", ex); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java new file mode 100644 index 0000000000000..15a9853477081 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.settings.Setting; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.fs.FsRepository; + +public class MockFsRepository extends FsRepository { + + public static Setting TRIGGER_DATA_INTEGRITY_FAILURE = Setting.boolSetting( + "mock_fs_repository.trigger_data_integrity_failure", + false + ); + + private final boolean triggerDataIntegrityFailure; + + public MockFsRepository( + RepositoryMetadata metadata, + Environment environment, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); + triggerDataIntegrityFailure = TRIGGER_DATA_INTEGRITY_FAILURE.get(metadata.settings()); + } + + @Override + protected BlobStore createBlobStore() throws Exception { + FsBlobStore fsBlobStore = (FsBlobStore) super.createBlobStore(); + return new MockFsBlobStore(fsBlobStore.bufferSizeInBytes(), fsBlobStore.path(), isReadOnly(), triggerDataIntegrityFailure); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java new file mode 100644 index 0000000000000..ffd53adf4e29e --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.RepositoryPlugin; +import org.opensearch.repositories.Repository; + +import java.util.Collections; +import java.util.Map; + +public class MockFsRepositoryPlugin extends Plugin implements RepositoryPlugin { + + public static final String TYPE = "fs_multipart_repository"; + + @Override + public Map getRepositories( + Environment env, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + return Collections.singletonMap( + "fs_multipart_repository", + metadata -> new MockFsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) + ); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index ac38768c9f3d3..c1fb0816be160 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -32,12 +32,15 @@ package org.opensearch.common.blobstore; +import org.opensearch.common.blobstore.stream.write.WriteContext; + import java.io.IOException; import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; /** * An interface for managing a repository of blob entries, where each blob entry is just a named group of bytes. @@ -124,6 +127,33 @@ default long readBlobPreferredLength() { */ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Used to check whether vendor plugin support for parallel upload of multiple streams is enabled or not. + * Returns false by default + * + * @return If multi-stream parallel uploads are supported + */ + default boolean isMultiStreamUploadSupported() { + return false; + } + + default boolean isRemoteDataIntegritySupported() { + return false; + } + + /** + * Reads blob content from multiple streams, each from a specific part of the file, which is provided by the + * StreamContextSupplier in the WriteContext passed to this method. An {@link IOException} is thrown if reading + * any of the input streams fails, or writing to the target blob fails + * + * @param writeContext A WriteContext object encapsulating all information needed to perform the upload + * @return A {@link CompletableFuture} representing the upload + * @throws IOException if any of the input streams could not be read, or the target blob could not be written to + */ + default CompletableFuture writeBlobByStreams(WriteContext writeContext) throws IOException { + throw new UnsupportedOperationException(); + } + /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. diff --git a/server/src/main/java/org/opensearch/common/util/ByteUtils.java b/server/src/main/java/org/opensearch/common/util/ByteUtils.java index 36ae3b1f5bcaa..8c7665d991751 100644 --- a/server/src/main/java/org/opensearch/common/util/ByteUtils.java +++ b/server/src/main/java/org/opensearch/common/util/ByteUtils.java @@ -61,6 +61,16 @@ public static void writeLongLE(long l, byte[] arr, int offset) { assert l == 0; } + /** Convert long to a byte array in big-endian format */ + public static byte[] toByteArrayBE(long l) { + byte[] result = new byte[8]; + for (int i = 7; i >= 0; i--) { + result[i] = (byte) (l & 0xffL); + l >>= 8; + } + return result; + } + /** Write a long in little-endian format. */ public static long readLongLE(byte[] arr, int offset) { long l = arr[offset++] & 0xFFL; diff --git a/server/src/main/java/org/opensearch/index/shard/FileUploader.java b/server/src/main/java/org/opensearch/index/shard/FileUploader.java new file mode 100644 index 0000000000000..98f04f76d1594 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/FileUploader.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.opensearch.common.CheckedFunction; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class is a wrapper over the copying of file from local to remote store allowing to decorate the actual copy + * method along with adding hooks of code that can be run before, on success and on failure. + * + * @opensearch.internal + */ +public class FileUploader { + + private static final Logger logger = LogManager.getLogger(FileUploader.class); + + private final UploadTracker uploadTracker; + + private final RemoteSegmentStoreDirectory remoteDirectory; + + private final Directory storeDirectory; + + private final Set excludeFiles; + + private final CheckedFunction checksumProvider; + + public FileUploader( + UploadTracker uploadTracker, + RemoteSegmentStoreDirectory remoteDirectory, + Directory storeDirectory, + Set excludeFiles, + CheckedFunction checksumProvider + ) { + this.uploadTracker = uploadTracker; + this.remoteDirectory = remoteDirectory; + this.storeDirectory = storeDirectory; + this.excludeFiles = excludeFiles; + this.checksumProvider = checksumProvider; + } + + /** + * Calling this method will filter out files that need to be skipped and call + * {@link RemoteSegmentStoreDirectory#copyFilesFrom} + * + * @param files The files that need to be uploaded + * @return A boolean for whether all files were successful or not + * @throws Exception when the underlying upload fails + */ + public boolean uploadFiles(Collection files) throws Exception { + Collection filteredFiles = files.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList()); + return remoteDirectory.copyFilesFrom(storeDirectory, filteredFiles, IOContext.DEFAULT, uploadTracker); + } + + /** + * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. + * + * @param file that needs to be uploaded. + * @return true if the upload has to be skipped for the file. + */ + private boolean skipUpload(String file) { + try { + // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. + return excludeFiles.contains(file) || remoteDirectory.containsFile(file, checksumProvider.apply(file)); + } catch (IOException e) { + logger.error( + "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", + file + ); + } + return false; + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 88b71a92d7340..0f578e73ef62e 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.ReferenceManager; @@ -21,7 +22,6 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.action.bulk.BackoffPolicy; -import org.opensearch.common.CheckedFunction; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; @@ -152,7 +152,7 @@ public void onFailure(String file) { // Track upload failure segmentTracker.addUploadBytesFailed(latestFileNameSizeOnLocalMap.get(file)); } - }, remoteDirectory, storeDirectory, this::getChecksumOfLocalFile); + }, remoteDirectory, storeDirectory, EXCLUDE_FILES, this::getChecksumOfLocalFile); } @Override @@ -367,14 +367,15 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s private boolean uploadNewSegments(Collection localSegmentsPostRefresh) throws IOException { AtomicBoolean uploadSuccess = new AtomicBoolean(true); - localSegmentsPostRefresh.forEach(file -> { - try { - fileUploader.uploadFile(file); - } catch (IOException e) { - uploadSuccess.set(false); - logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); + try { + uploadSuccess.set(fileUploader.uploadFiles(localSegmentsPostRefresh)); + } catch (Exception e) { + uploadSuccess.set(false); + if (e instanceof CorruptIndexException) { + indexShard.failShard(e.getMessage(), e); } - }); + logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", e), e); + } return uploadSuccess.get(); } @@ -455,100 +456,4 @@ private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long segmentTracker.incrementTotalUploadsFailed(); } } - - /** - * This class is a wrapper over the copying of file from local to remote store allowing to decorate the actual copy - * method along with adding hooks of code that can be run before, on success and on failure. - * - * @opensearch.internal - */ - private static class FileUploader { - - private final UploadTracker uploadTracker; - - private final RemoteSegmentStoreDirectory remoteDirectory; - - private final Directory storeDirectory; - - private final CheckedFunction checksumProvider; - - public FileUploader( - UploadTracker uploadTracker, - RemoteSegmentStoreDirectory remoteDirectory, - Directory storeDirectory, - CheckedFunction checksumProvider - ) { - this.uploadTracker = uploadTracker; - this.remoteDirectory = remoteDirectory; - this.storeDirectory = storeDirectory; - this.checksumProvider = checksumProvider; - } - - /** - * Calling this method will lead to before getting executed and then the actual upload. Based on the upload status, - * the onSuccess or onFailure method gets invoked. - * - * @param file the file which is to be uploaded. - * @throws IOException is thrown if the upload fails. - */ - private void uploadFile(String file) throws IOException { - if (skipUpload(file)) { - return; - } - uploadTracker.beforeUpload(file); - boolean success = false; - try { - performUpload(file); - uploadTracker.onSuccess(file); - success = true; - } finally { - if (!success) { - uploadTracker.onFailure(file); - } - } - } - - /** - * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. - * - * @param file that needs to be uploaded. - * @return true if the upload has to be skipped for the file. - */ - private boolean skipUpload(String file) { - try { - // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. - return EXCLUDE_FILES.contains(file) || remoteDirectory.containsFile(file, checksumProvider.apply(file)); - } catch (IOException e) { - logger.error( - "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", - file - ); - } - return false; - } - - /** - * This method does the actual upload. - * - * @param file that needs to be uploaded. - * @throws IOException is thrown if the upload fails. - */ - private void performUpload(String file) throws IOException { - remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); - } - } - - /** - * A tracker class that is fed to FileUploader. - * - * @opensearch.internal - */ - interface UploadTracker { - - void beforeUpload(String file); - - void onSuccess(String file); - - void onFailure(String file); - } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index bb2ff47cf0072..185dfcb60e13a 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -37,6 +37,10 @@ public class RemoteDirectory extends Directory { protected final BlobContainer blobContainer; + public BlobContainer getBlobContainer() { + return blobContainer; + } + public RemoteDirectory(BlobContainer blobContainer) { this.blobContainer = blobContainer; } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 9f41ac6f7fd17..79a3c44425b4d 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -8,37 +8,52 @@ package org.opensearch.index.store; +import com.jcraft.jzlib.JZlib; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.opensearch.ExceptionsHelper; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; +import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.common.util.ByteUtils; +import org.opensearch.index.shard.UploadTracker; +import org.opensearch.index.store.exception.ChecksumCombinationException; +import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; -import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; -import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; import java.io.IOException; import java.nio.file.NoSuchFileException; -import java.util.Map; -import java.util.HashSet; -import java.util.Optional; -import java.util.HashMap; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.zip.CRC32; /** * A RemoteDirectory extension for remote segment store. We need to make sure we don't overwrite a segment file once uploaded. @@ -57,6 +72,11 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement */ public static final String SEGMENT_NAME_UUID_SEPARATOR = "__"; + /** + * Number of bytes in the segment file to store checksum + */ + private static final int SEGMENT_CHECKSUM_BYTES = 8; + public static final MetadataFilenameUtils.MetadataFilenameComparator METADATA_FILENAME_COMPARATOR = new MetadataFilenameUtils.MetadataFilenameComparator(); @@ -325,6 +345,132 @@ public IndexInput openInput(String name, IOContext context) throws IOException { } } + /** + * Copies a list of files from the source directory to a remote based on multi-stream upload support. + * If vendor plugin supports uploading multiple parts in parallel, BlobContainer#writeBlobByStreams + * will be used, else, the legacy {@link RemoteSegmentStoreDirectory#copyFrom(Directory, String, String, IOContext, boolean)} + * will be called. + * + * @param from The directory for all files to be uploaded + * @param files A list containing the names of all files to be uploaded + * @param context IOContext to be used to open IndexInput to files during remote upload + * @param uploadTracker An {@link UploadTracker} for tracking file uploads + * @throws Exception When upload future creation fails or if {@link RemoteSegmentStoreDirectory#copyFrom(Directory, String, String, IOContext, boolean)} + * throws an exception + */ + public boolean copyFilesFrom(Directory from, Collection files, IOContext context, UploadTracker uploadTracker) + throws Exception { + + List> resultFutures = new ArrayList<>(); + + boolean uploadOfAllFilesSuccessful = true; + for (String src : files) { + String remoteFilename = createRemoteFileName(src, false); + uploadTracker.beforeUpload(src); + if (remoteDataDirectory.getBlobContainer().isMultiStreamUploadSupported()) { + try { + CompletableFuture resultFuture = createUploadFuture(from, src, remoteFilename, context); + resultFuture.whenComplete((uploadResponse, throwable) -> { + if (throwable != null) { + uploadTracker.onFailure(src); + } else { + uploadTracker.onSuccess(src); + } + }); + resultFutures.add(resultFuture); + } catch (Exception e) { + uploadTracker.onFailure(src); + throw e; + } + } else { + boolean success = true; + try { + copyFrom(from, src, src, context, false); + uploadTracker.onSuccess(src); + } catch (IOException e) { + success = false; + uploadOfAllFilesSuccessful = false; + logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", src), e); + } finally { + if (!success) { + uploadTracker.onFailure(src); + } + } + } + } + + if (resultFutures.isEmpty() == false) { + CompletableFuture resultFuture = CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); + try { + resultFuture.get(); + } catch (ExecutionException e) { + IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e); + if (corruptIndexException != null) { + throw corruptIndexException; + } + throw e; + } + } + + return uploadOfAllFilesSuccessful; + } + + private CompletableFuture createUploadFuture(Directory from, String src, String remoteFileName, IOContext ioContext) + throws Exception { + + AtomicReference exceptionRef = new AtomicReference<>(); + long expectedChecksum = calculateChecksumOfChecksum(from, src); + long contentLength; + try (IndexInput indexInput = from.openInput(src, ioContext)) { + contentLength = indexInput.length(); + } + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + src, + remoteFileName, + contentLength, + true, + WritePriority.NORMAL, + (size, position) -> new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position), + expectedChecksum, + remoteDataDirectory.getBlobContainer().isRemoteDataIntegritySupported(), + false + ); + WriteContext writeContext = remoteTransferContainer.createWriteContext(); + CompletableFuture uploadFuture = remoteDataDirectory.getBlobContainer().writeBlobByStreams(writeContext); + return uploadFuture.whenComplete((resp, throwable) -> { + try { + remoteTransferContainer.close(); + } catch (Exception e) { + logger.warn("Error occurred while closing streams", e); + } + if (throwable != null) { + handleException(throwable, exceptionRef); + } else { + try { + postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e); + handleException(e, exceptionRef); + } + } + }); + } + + private void handleException(Throwable throwable, AtomicReference exceptionRef) { + Exception ex; + if (throwable instanceof Exception) { + ex = (Exception) throwable; + } else { + ex = new RuntimeException(throwable); + } + + if (exceptionRef.get() == null) { + exceptionRef.set(ex); + } else { + exceptionRef.get().addSuppressed(ex); + } + } + /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} * @param primaryTerm Primary Term of index at the time of commit. @@ -392,23 +538,33 @@ String getMetadataFileForCommit(long primaryTerm, long generation) throws IOExce return metadataFiles.iterator().next(); } - public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix, String checksum) - throws IOException { + private String createRemoteFileName(String dest, boolean useCommonSuffix) { String remoteFilename; if (useCommonSuffix) { remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix; } else { remoteFilename = getNewRemoteSegmentFilename(dest); } + + return remoteFilename; + } + + public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix, String checksum) + throws IOException { + String remoteFilename = createRemoteFileName(dest, useCommonSuffix); remoteDataDirectory.copyFrom(from, src, remoteFilename, context); - UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); - segmentsUploadedToRemoteStore.put(src, segmentMetadata); + postUpload(from, src, remoteFilename, checksum); } public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException { copyFrom(from, src, dest, context, useCommonSuffix, getChecksumOfLocalFile(from, src)); } + private void postUpload(Directory from, String src, String remoteFilename, String checksum) throws IOException { + UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); + segmentsUploadedToRemoteStore.put(src, segmentMetadata); + } + /** * Copies an existing src file from directory from to a non-existent file dest in this directory. * Once the segment is uploaded to remote segment store, update the cache accordingly. @@ -466,6 +622,27 @@ private String getChecksumOfLocalFile(Directory directory, String file) throws I } } + private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException { + try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) { + long storedChecksum = CodecUtil.retrieveChecksum(indexInput); + CRC32 checksumOfChecksum = new CRC32(); + checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); + try { + return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), SEGMENT_CHECKSUM_BYTES); + } catch (Exception e) { + throw new ChecksumCombinationException( + "Potentially corrupted file: Checksum combination failed while combining stored checksum " + + "and calculated checksum of stored checksum in segment file: " + + file + + ", directory: " + + directory, + file, + e + ); + } + } + } + private String getExistingRemoteFilename(String localFilename) { if (segmentsUploadedToRemoteStore.containsKey(localFilename)) { return segmentsUploadedToRemoteStore.get(localFilename).uploadedFilename; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 88fe816ccb462..3979aaf4a9f8d 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -15,13 +15,24 @@ import org.opensearch.action.ActionRunnable; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream; +import org.opensearch.index.translog.ChannelFactory; +import org.opensearch.index.translog.checked.TranslogCheckedContainer; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; /** * Service that handles remote transfer of translog and checkpoint files @@ -41,18 +52,18 @@ public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) { } @Override - public void uploadBlobAsync( - String threadpoolName, + public void uploadBlobByThreadPool( + String threadPoolName, final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath, - ActionListener listener + ActionListener listener, + WritePriority writePriority ) { assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; - threadPool.executor(threadpoolName).execute(ActionRunnable.wrap(listener, l -> { - try (InputStream inputStream = fileSnapshot.inputStream()) { - blobStore.blobContainer(blobPath) - .writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); + threadPool.executor(threadPoolName).execute(ActionRunnable.wrap(listener, l -> { + try { + uploadBlob(fileSnapshot, blobPath, writePriority); l.onResponse(fileSnapshot); } catch (Exception e) { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); @@ -62,14 +73,106 @@ public void uploadBlobAsync( } @Override - public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { - assert remoteTransferPath instanceof BlobPath; + public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath, WritePriority writePriority) + throws IOException { BlobPath blobPath = (BlobPath) remoteTransferPath; try (InputStream inputStream = fileSnapshot.inputStream()) { blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); } } + @Override + public void uploadBlobs( + Set fileSnapshots, + final Map blobPaths, + ActionListener listener, + WritePriority writePriority + ) { + List> resultFutures = new ArrayList<>(); + fileSnapshots.forEach(fileSnapshot -> { + BlobPath blobPath = blobPaths.get(fileSnapshot.getPrimaryTerm()); + if (!blobStore.blobContainer(blobPath).isMultiStreamUploadSupported()) { + uploadBlobByThreadPool(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority); + } else { + CompletableFuture resultFuture = createUploadFuture(fileSnapshot, listener, blobPath, writePriority); + if (resultFuture != null) { + resultFutures.add(resultFuture); + } + } + }); + + if (resultFutures.isEmpty() == false) { + CompletableFuture resultFuture = CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); + try { + resultFuture.get(); + } catch (Exception e) { + logger.warn("Failed to upload blobs", e); + } + } + } + + private CompletableFuture createUploadFuture( + TransferFileSnapshot fileSnapshot, + ActionListener listener, + BlobPath blobPath, + WritePriority writePriority + ) { + + CompletableFuture resultFuture = null; + try { + ChannelFactory channelFactory = FileChannel::open; + long contentLength; + long expectedChecksum; + try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) { + contentLength = channel.size(); + TranslogCheckedContainer translogCheckedContainer = new TranslogCheckedContainer( + channel, + 0, + (int) contentLength, + fileSnapshot.getName() + ); + expectedChecksum = translogCheckedContainer.getChecksum(); + } + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + fileSnapshot.getName(), + fileSnapshot.getName(), + contentLength, + true, + writePriority, + (size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position), + expectedChecksum, + blobStore.blobContainer(blobPath).isRemoteDataIntegritySupported(), + false + ); + WriteContext writeContext = remoteTransferContainer.createWriteContext(); + CompletableFuture uploadFuture = blobStore.blobContainer(blobPath).writeBlobByStreams(writeContext); + resultFuture = uploadFuture.whenComplete((resp, throwable) -> { + try { + remoteTransferContainer.close(); + } catch (Exception e) { + logger.warn("Error occurred while closing streams", e); + } + if (throwable != null) { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), throwable); + listener.onFailure(new FileTransferException(fileSnapshot, throwable)); + } else { + listener.onResponse(fileSnapshot); + } + }); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); + listener.onFailure(new FileTransferException(fileSnapshot, e)); + } finally { + try { + fileSnapshot.close(); + } catch (IOException e) { + logger.warn("Error while closing TransferFileSnapshot", e); + } + } + + return resultFuture; + } + @Override public InputStream downloadBlob(Iterable path, String fileName) throws IOException { return blobStore.blobContainer((BlobPath) path).readBlob(fileName); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 6aca3055a3f53..18507cd48cbdb 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -9,11 +9,14 @@ package org.opensearch.index.translog.transfer; import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -25,25 +28,40 @@ public interface TransferService { /** * Uploads the {@link TransferFileSnapshot} async, once the upload is complete the callback is invoked - * @param threadpoolName threadpool type which will be used to upload blobs asynchronously + * @param threadPoolName threadpool type which will be used to upload blobs asynchronously * @param fileSnapshot the file snapshot to upload * @param remotePath the remote path where upload should be made * @param listener the callback to be invoked once upload completes successfully/fails */ - void uploadBlobAsync( - String threadpoolName, + void uploadBlobByThreadPool( + String threadPoolName, final TransferFileSnapshot fileSnapshot, Iterable remotePath, - ActionListener listener + ActionListener listener, + WritePriority writePriority ); + /** + * Uploads multiple {@link TransferFileSnapshot}, once the upload is complete the callback is invoked + * @param fileSnapshots the file snapshots to upload + * @param blobPaths Primary term to {@link BlobPath} map + * @param listener the callback to be invoked once uploads complete successfully/fail + */ + void uploadBlobs( + Set fileSnapshots, + final Map blobPaths, + ActionListener listener, + WritePriority writePriority + ) throws Exception; + /** * Uploads the {@link TransferFileSnapshot} blob * @param fileSnapshot the file snapshot to upload * @param remotePath the remote path where upload should be made + * @param writePriority Priority by which content needs to be written. * @throws IOException the exception while transferring the data */ - void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; + void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath, WritePriority writePriority) throws IOException; void deleteBlobs(Iterable path, List fileNames) throws IOException; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 243fd8801a562..d04d240b9c0a1 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -15,6 +15,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -106,14 +107,16 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans }), latch ); + Map blobPathMap = new HashMap<>(); toUpload.forEach( - fileSnapshot -> transferService.uploadBlobAsync( - ThreadPool.Names.TRANSLOG_TRANSFER, - fileSnapshot, - remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), - latchedActionListener + fileSnapshot -> blobPathMap.put( + fileSnapshot.getPrimaryTerm(), + remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())) ) ); + + transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH); + try { if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) { Exception ex = new TimeoutException("Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"); @@ -126,7 +129,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans throw ex; } if (exceptionList.isEmpty()) { - transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadataTransferPath); + transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadataTransferPath, WritePriority.HIGH); translogTransferListener.onUploadComplete(transferSnapshot); return true; } else { diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 2c91d5aa33090..7c53371b24c18 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -46,10 +46,10 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.OpenSearchExecutors; -import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.concurrent.XRejectedExecutionHandler; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.node.Node; @@ -62,8 +62,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; @@ -438,6 +440,16 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String e return new ScheduledCancellableAdapter(scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); } + public Future executeCallable(Callable command, String executorName) { + ExecutorService executorService = executor(executorName); + return executorService.submit(command); + } + + public boolean isExecutorShutDown(String executorName) { + ExecutorService executorService = executor(executorName); + return executorService.isShutdown(); + } + public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) { try { schedule(command, delay, executor); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 10295ffc56812..a82616a4254c4 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -20,6 +20,8 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.junit.Before; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; @@ -33,6 +35,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.HashMap; import java.util.Collection; @@ -53,6 +57,7 @@ public class RemoteSegmentStoreDirectoryTests extends OpenSearchTestCase { private RemoteStoreMetadataLockManager mdLockManager; private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; + private TestUploadTracker testUploadTracker; @Before public void setup() throws IOException { @@ -61,6 +66,7 @@ public void setup() throws IOException { mdLockManager = mock(RemoteStoreMetadataLockManager.class); remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory, mdLockManager); + testUploadTracker = new TestUploadTracker(); } public void testUploadedSegmentMetadataToString() { @@ -496,6 +502,99 @@ public void testCopyFrom() throws IOException { storeDirectory.close(); } + public void testCopyFilesFromMultipart() throws Exception { + String filename = "_100.si"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + CompletableFuture uploadResponseCompletableFuture = new CompletableFuture<>(); + uploadResponseCompletableFuture.complete(null); + when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenReturn(uploadResponseCompletableFuture); + + remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT, testUploadTracker); + + assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + assertEquals(TestUploadTracker.UploadStatus.UPLOAD_SUCCESS, testUploadTracker.getUploadStatus(filename)); + + storeDirectory.close(); + } + + public void testCopyFilesFromMultipartIOException() throws Exception { + String filename = "_100.si"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + CompletableFuture uploadResponseCompletableFuture = new CompletableFuture<>(); + uploadResponseCompletableFuture.complete(null); + when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenThrow(new IOException()); + + assertThrows( + IOException.class, + () -> remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT, testUploadTracker) + ); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + assertEquals(TestUploadTracker.UploadStatus.UPLOAD_FAILURE, testUploadTracker.getUploadStatus(filename)); + + storeDirectory.close(); + } + + public void testCopyFilesFromMultipartUploadFutureCompletedExceptionally() throws Exception { + String filename = "_100.si"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + CompletableFuture uploadResponseCompletableFuture = new CompletableFuture<>(); + uploadResponseCompletableFuture.completeExceptionally(new IOException()); + when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenReturn(uploadResponseCompletableFuture); + + assertThrows( + ExecutionException.class, + () -> remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT, testUploadTracker) + ); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + assertEquals(TestUploadTracker.UploadStatus.UPLOAD_FAILURE, testUploadTracker.getUploadStatus(filename)); + + storeDirectory.close(); + } + public void testCopyFromException() throws IOException { String filename = "_100.si"; Directory storeDirectory = LuceneTestCase.newDirectory(); diff --git a/server/src/test/java/org/opensearch/index/store/TestUploadTracker.java b/server/src/test/java/org/opensearch/index/store/TestUploadTracker.java new file mode 100644 index 0000000000000..05250d69b7867 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/TestUploadTracker.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.opensearch.index.shard.UploadTracker; + +import java.util.concurrent.ConcurrentHashMap; + +public class TestUploadTracker implements UploadTracker { + + private final ConcurrentHashMap uploadStatusMap = new ConcurrentHashMap<>(); + + enum UploadStatus { + BEFORE_UPLOAD, + UPLOAD_SUCCESS, + UPLOAD_FAILURE + } + + @Override + public void beforeUpload(String file) { + uploadStatusMap.put(file, UploadStatus.BEFORE_UPLOAD); + } + + @Override + public void onSuccess(String file) { + uploadStatusMap.put(file, UploadStatus.UPLOAD_SUCCESS); + } + + @Override + public void onFailure(String file) { + uploadStatusMap.put(file, UploadStatus.UPLOAD_FAILURE); + } + + public UploadStatus getUploadStatus(String file) { + return uploadStatusMap.get(file); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java new file mode 100644 index 0000000000000..3020703cd398f --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java @@ -0,0 +1,174 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class BlobStoreTransferServiceMockRepositoryTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + + private BlobStore blobStore; + + @Override + public void setUp() throws Exception { + super.setUp(); + blobStore = mock(BlobStore.class); + threadPool = new TestThreadPool(getClass().getName()); + } + + public void testUploadBlobs() throws Exception { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + CompletableFuture uploadResponseCompletableFuture = new CompletableFuture<>(); + uploadResponseCompletableFuture.complete(null); + when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenReturn(uploadResponseCompletableFuture); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + + TransferService transferService = new BlobStoreTransferService(blobStore, threadPool); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + AtomicReference fileSnapshotRef = new AtomicReference<>(); + transferService.uploadBlobs(Collections.singleton(transferFileSnapshot), new HashMap<>() { + { + put(transferFileSnapshot.getPrimaryTerm(), new BlobPath().add("sample_path")); + } + }, new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + onResponseCalled.set(true); + fileSnapshotRef.set(fileSnapshot); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }, latch), WritePriority.HIGH); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + verify(blobContainer).writeBlobByStreams(any(WriteContext.class)); + assertTrue(onResponseCalled.get()); + assertEquals(transferFileSnapshot.getPrimaryTerm(), fileSnapshotRef.get().getPrimaryTerm()); + assertEquals(transferFileSnapshot.getName(), fileSnapshotRef.get().getName()); + assertNull(exceptionRef.get()); + } + + public void testUploadBlobsIOException() throws Exception { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + doThrow(new IOException()).when(blobContainer).writeBlobByStreams(any(WriteContext.class)); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + + TransferService transferService = new BlobStoreTransferService(blobStore, threadPool); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + transferService.uploadBlobs(Collections.singleton(transferFileSnapshot), new HashMap<>() { + { + put(transferFileSnapshot.getPrimaryTerm(), new BlobPath().add("sample_path")); + } + }, new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + onResponseCalled.set(true); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }, latch), WritePriority.HIGH); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + verify(blobContainer).writeBlobByStreams(any(WriteContext.class)); + assertFalse(onResponseCalled.get()); + assertTrue(exceptionRef.get() instanceof FileTransferException); + } + + public void testUploadBlobsUploadFutureCompletedExceptionally() throws Exception { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + CompletableFuture uploadResponseCompletableFuture = new CompletableFuture<>(); + uploadResponseCompletableFuture.completeExceptionally(new IOException()); + when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenReturn(uploadResponseCompletableFuture); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + + TransferService transferService = new BlobStoreTransferService(blobStore, threadPool); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + transferService.uploadBlobs(Collections.singleton(transferFileSnapshot), new HashMap<>() { + { + put(transferFileSnapshot.getPrimaryTerm(), new BlobPath().add("sample_path")); + } + }, new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + onResponseCalled.set(true); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }, latch), WritePriority.HIGH); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + verify(blobContainer).writeBlobByStreams(any(WriteContext.class)); + assertFalse(onResponseCalled.get()); + assertTrue(exceptionRef.get() instanceof FileTransferException); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index 196fbd58c2c20..cc00c49892853 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -12,6 +12,7 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.env.Environment; @@ -51,7 +52,7 @@ public void testUploadBlob() throws IOException { Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); - transferService.uploadBlob(transferFileSnapshot, repository.basePath()); + transferService.uploadBlob(transferFileSnapshot, repository.basePath(), WritePriority.HIGH); } public void testUploadBlobFromByteArray() throws IOException { @@ -61,7 +62,7 @@ public void testUploadBlobFromByteArray() throws IOException { 1 ); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); - transferService.uploadBlob(transferFileSnapshot, repository.basePath()); + transferService.uploadBlob(transferFileSnapshot, repository.basePath(), WritePriority.NORMAL); } public void testUploadBlobAsync() throws IOException, InterruptedException { @@ -71,7 +72,7 @@ public void testUploadBlobAsync() throws IOException, InterruptedException { FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); CountDownLatch latch = new CountDownLatch(1); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); - transferService.uploadBlobAsync( + transferService.uploadBlobByThreadPool( ThreadPool.Names.TRANSLOG_TRANSFER, transferFileSnapshot, repository.basePath(), @@ -87,7 +88,8 @@ public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { public void onFailure(Exception e) { throw new AssertionError("Failed to perform uploadBlobAsync", e); } - }, latch) + }, latch), + WritePriority.HIGH ); assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); assertTrue(succeeded.get()); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 1c485dbc35c63..d30bdb12075d7 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -14,6 +14,7 @@ import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -35,6 +36,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -73,20 +76,24 @@ public void tearDown() throws Exception { } @SuppressWarnings("unchecked") - public void testTransferSnapshot() throws IOException { + public void testTransferSnapshot() throws Exception { AtomicInteger fileTransferSucceeded = new AtomicInteger(); AtomicInteger fileTransferFailed = new AtomicInteger(); AtomicInteger translogTransferSucceeded = new AtomicInteger(); AtomicInteger translogTransferFailed = new AtomicInteger(); doNothing().when(transferService) - .uploadBlob(any(TransferFileSnapshot.class), Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm)))); + .uploadBlob( + any(TransferFileSnapshot.class), + Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm))), + any(WritePriority.class) + ); doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[3]; - listener.onResponse((TransferFileSnapshot) invocationOnMock.getArguments()[1]); + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + Set transferFileSnapshots = (Set) invocationOnMock.getArguments()[0]; + transferFileSnapshots.forEach(listener::onResponse); return null; - }).when(transferService) - .uploadBlobAsync(any(String.class), any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); FileTransferTracker fileTransferTracker = new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { @Override