diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 4d66bdd6cf3d1..e484cc5ead578 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -42,7 +42,7 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreIT extends OpenSearchIntegTestCase { - private static final String REPOSITORY_NAME = "test-remore-store-repo"; + protected static final String REPOSITORY_NAME = "test-remore-store-repo"; private static final String INDEX_NAME = "remote-store-test-idx-1"; private static final String TOTAL_OPERATIONS = "total-operations"; private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations"; @@ -59,7 +59,7 @@ public Settings indexSettings() { return remoteStoreIndexSettings(0); } - private Settings remoteStoreIndexSettings(int numberOfReplicas) { + protected Settings remoteStoreIndexSettings(int numberOfReplicas) { return Settings.builder() .put(super.indexSettings()) .put("index.refresh_interval", "300s") @@ -94,14 +94,22 @@ protected Settings featureFlagSettings() { public void setup() { internalCluster().startClusterManagerOnlyNode(); Path absolutePath = randomRepoPath().toAbsolutePath(); + putRepository(absolutePath); + } + + protected void putRepository(Path path) { assertAcked( - clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", path)) ); } + protected void deleteRepository() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + @After public void teardown() { - assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + deleteRepository(); } private IndexResponse indexSingleDoc() { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java new file mode 100644 index 0000000000000..4645102b435ee --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart; + +import org.opensearch.common.settings.Settings; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.RemoteStoreIT; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; + +import java.nio.file.Path; +import java.util.Collection; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public class RemoteStoreMultipartIT extends RemoteStoreIT { + + @Override + protected Collection> nodePlugins() { + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList()); + } + + @Override + protected void putRepository(Path path) { + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME) + .setType(MockFsRepositoryPlugin.TYPE) + .setSettings(Settings.builder().put("location", path)) + ); + } + + @Override + protected void deleteRepository() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobContainer.java new file mode 100644 index 0000000000000..0647215bf07bc --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobContainer.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.opensearch.common.Stream; +import org.opensearch.common.StreamProvider; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.fs.FsBlobContainer; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.blobstore.stream.StreamContext; +import org.opensearch.common.blobstore.stream.write.UploadResponse; +import org.opensearch.common.blobstore.stream.write.WriteContext; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class MockFsBlobContainer extends FsBlobContainer { + + private static final int TRANSFER_TIMEOUT_MILLIS = 30000; + + public MockFsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) { + super(blobStore, blobPath, path); + } + + @Override + public boolean isMultiStreamUploadSupported() { + return true; + } + + @Override + public CompletableFuture writeBlobByStreams(WriteContext writeContext) throws IOException { + CompletableFuture completableFuture = new CompletableFuture<>(); + + int nParts = 10; + long partSize = writeContext.getFileSize() / nParts; + StreamContext streamContext = writeContext.getStreamContext(partSize); + StreamProvider streamProvider = streamContext.getStreamProvider(); + final Path file = path.resolve(writeContext.getFileName()); + byte[] buffer = new byte[(int) writeContext.getFileSize()]; + AtomicLong totalContentRead = new AtomicLong(); + CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts()); + for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { + int finalPartIdx = partIdx; + Thread thread = new Thread(() -> { + try { + Stream stream = streamProvider.provideStream(finalPartIdx); + InputStream inputStream = stream.getInputStream(); + long remainingContentLength = stream.getContentLength(); + long offset = stream.getOffset(); + while (remainingContentLength > 0) { + int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength); + totalContentRead.addAndGet(readContentLength); + remainingContentLength -= readContentLength; + offset += readContentLength; + } + inputStream.close(); + } catch (IOException e) { + completableFuture.completeExceptionally(e); + } finally { + latch.countDown(); + } + }); + thread.start(); + } + try { + if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName()); + } + } catch (InterruptedException e) { + throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName()); + } + try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { + outputStream.write(buffer); + } + if (writeContext.getFileSize() != totalContentRead.get()) { + throw new IOException( + "Incorrect content length read for file " + + writeContext.getFileName() + + ", actual file size: " + + writeContext.getFileSize() + + ", bytes read: " + + totalContentRead.get() + ); + } + completableFuture.complete(new UploadResponse(true)); + + return completableFuture; + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java new file mode 100644 index 0000000000000..94a3c5c31343c --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.fs.FsBlobStore; + +import java.io.IOException; +import java.nio.file.Path; + +public class MockFsBlobStore extends FsBlobStore { + + public MockFsBlobStore(int bufferSizeInBytes, Path path, boolean readonly) throws IOException { + super(bufferSizeInBytes, path, readonly); + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + try { + return new MockFsBlobContainer(this, path, buildAndCreate(path)); + } catch (IOException ex) { + throw new OpenSearchException("failed to create blob container", ex); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java new file mode 100644 index 0000000000000..3765b75ab3816 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.fs.FsRepository; + +public class MockFsRepository extends FsRepository { + + public MockFsRepository( + RepositoryMetadata metadata, + Environment environment, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); + } + + @Override + protected BlobStore createBlobStore() throws Exception { + FsBlobStore fsBlobStore = (FsBlobStore) super.createBlobStore(); + return new MockFsBlobStore(fsBlobStore.bufferSizeInBytes(), fsBlobStore.path(), isReadOnly()); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java new file mode 100644 index 0000000000000..ffd53adf4e29e --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.RepositoryPlugin; +import org.opensearch.repositories.Repository; + +import java.util.Collections; +import java.util.Map; + +public class MockFsRepositoryPlugin extends Plugin implements RepositoryPlugin { + + public static final String TYPE = "fs_multipart_repository"; + + @Override + public Map getRepositories( + Environment env, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + return Collections.singletonMap( + "fs_multipart_repository", + metadata -> new MockFsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) + ); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index ac38768c9f3d3..340973b46e607 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -32,12 +32,16 @@ package org.opensearch.common.blobstore; +import org.opensearch.common.blobstore.stream.write.UploadResponse; +import org.opensearch.common.blobstore.stream.write.WriteContext; + import java.io.IOException; import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; /** * An interface for managing a repository of blob entries, where each blob entry is just a named group of bytes. @@ -124,6 +128,29 @@ default long readBlobPreferredLength() { */ void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** + * Used to check whether vendor plugin support for parallel upload of multiple streams is enabled or not. + * Returns false by default + * + * @return If multi-stream parallel uploads are supported + */ + default boolean isMultiStreamUploadSupported() { + return false; + } + + /** + * Reads blob content from multiple streams, each from a specific part of the file, which is provided by the + * StreamContextSupplier in the WriteContext passed to this method. An {@link IOException} is thrown if reading + * any of the input streams fails, or writing to the target blob fails + * + * @param writeContext A WriteContext object encapsulating all information needed to perform the upload + * @return A {@link CompletableFuture} representing the upload + * @throws IOException if any of the input streams could not be read, or the target blob could not be written to + */ + default CompletableFuture writeBlobByStreams(WriteContext writeContext) throws IOException { + throw new UnsupportedOperationException(); + } + /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/UploadResponse.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/UploadResponse.java new file mode 100644 index 0000000000000..8ffa555f47a21 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/UploadResponse.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.stream.write; + +/** + * Response object for uploads using BlobContainer#writeBlobByStreams + */ +public class UploadResponse { + + private final boolean uploadSuccessful; + + /** + * Construct a new UploadResponse object + * + * @param uploadSuccessful Whether the current upload was successful or not + */ + public UploadResponse(boolean uploadSuccessful) { + this.uploadSuccessful = uploadSuccessful; + } + + /** + * @return The upload success result + */ + public boolean isUploadSuccessful() { + return uploadSuccessful; + } +} diff --git a/server/src/main/java/org/opensearch/common/util/ByteUtils.java b/server/src/main/java/org/opensearch/common/util/ByteUtils.java index 36ae3b1f5bcaa..8c7665d991751 100644 --- a/server/src/main/java/org/opensearch/common/util/ByteUtils.java +++ b/server/src/main/java/org/opensearch/common/util/ByteUtils.java @@ -61,6 +61,16 @@ public static void writeLongLE(long l, byte[] arr, int offset) { assert l == 0; } + /** Convert long to a byte array in big-endian format */ + public static byte[] toByteArrayBE(long l) { + byte[] result = new byte[8]; + for (int i = 7; i >= 0; i--) { + result[i] = (byte) (l & 0xffL); + l >>= 8; + } + return result; + } + /** Write a long in little-endian format. */ public static long readLongLE(byte[] arr, int offset) { long l = arr[offset++] & 0xFFL; diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index efd2686b41a20..ffef4b05a5195 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -199,7 +199,7 @@ String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos s // Visible for testing boolean uploadNewSegments(Collection localFiles) throws IOException { AtomicBoolean uploadSuccess = new AtomicBoolean(true); - localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { + Collection filteredFiles = localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { try { return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file)); } catch (IOException e) { @@ -209,15 +209,15 @@ boolean uploadNewSegments(Collection localFiles) throws IOException { ); return true; } - }).forEach(file -> { - try { - remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); - } catch (IOException e) { - uploadSuccess.set(false); - // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) - logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); - } - }); + }).collect(Collectors.toList()); + + try { + remoteDirectory.copyFilesFrom(storeDirectory, filteredFiles, IOContext.DEFAULT); + } catch (Exception e) { + uploadSuccess.set(false); + // ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) + logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", e), e); + } return uploadSuccess.get(); } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 62e2b12896411..7b0ff421678dd 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -37,6 +37,10 @@ public class RemoteDirectory extends Directory { private final BlobContainer blobContainer; + public BlobContainer getBlobContainer() { + return blobContainer; + } + public RemoteDirectory(BlobContainer blobContainer) { this.blobContainer = blobContainer; } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index c385303813844..238b2750b45e1 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; @@ -17,13 +18,20 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.stream.write.UploadResponse; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.io.VersionedCodecStreamWrapper; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; -import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; import java.io.IOException; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -33,8 +41,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** @@ -317,14 +328,112 @@ public IndexInput openInput(String name, IOContext context) throws IOException { } } - public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException { + /** + * Copies a list of files from the source directory to a remote based on multi-stream upload support. + * If vendor plugin supports uploading multiple parts in parallel, BlobContainer#writeBlobByStreams + * will be used, else, the legacy {@link RemoteSegmentStoreDirectory#copyFrom(Directory, String, String, IOContext, boolean)} + * will be called. + * + * @param from The directory for all files to be uploaded + * @param files A list containing the names of all files to be uploaded + * @param context IOContext to be used to open IndexInput to files during remote upload + * @throws Exception When upload future creation fails or if {@link RemoteSegmentStoreDirectory#copyFrom(Directory, String, String, IOContext, boolean)} + * throws an exception + */ + public void copyFilesFrom(Directory from, Collection files, IOContext context) throws Exception { + + List> resultFutures = new ArrayList<>(); + + for (String src : files) { + String remoteFilename = createRemoteFileName(src, false); + if (remoteDataDirectory.getBlobContainer().isMultiStreamUploadSupported()) { + CompletableFuture resultFuture = createUploadFuture(from, src, remoteFilename, context); + resultFutures.add(resultFuture); + } else { + copyFrom(from, src, src, context, false); + } + } + + if (resultFutures.isEmpty() == false) { + CompletableFuture resultFuture = CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); + try { + resultFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw e; + } + } + } + + private CompletableFuture createUploadFuture(Directory from, String src, String remoteFileName, IOContext ioContext) + throws Exception { + + AtomicReference exceptionRef = new AtomicReference<>(); + long contentLength; + try (IndexInput indexInput = from.openInput(src, ioContext)) { + contentLength = indexInput.length(); + } + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + src, + remoteFileName, + contentLength, + true, + WritePriority.NORMAL, + (size, position) -> new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position) + ); + WriteContext writeContext = remoteTransferContainer.createWriteContext(); + CompletableFuture uploadFuture = remoteDataDirectory.getBlobContainer().writeBlobByStreams(writeContext); + return uploadFuture.whenComplete((resp, throwable) -> { + try { + remoteTransferContainer.close(); + } catch (Exception e) { + logger.warn("Error occurred while closing streams", e); + } + if (throwable != null) { + handleException(throwable, exceptionRef); + } else { + try { + postUpload(from, src, remoteFileName); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e); + handleException(e, exceptionRef); + } + } + }); + } + + private void handleException(Throwable throwable, AtomicReference exceptionRef) { + Exception ex; + if (throwable instanceof Exception) { + ex = (Exception) throwable; + } else { + ex = new RuntimeException(throwable); + } + + if (exceptionRef.get() == null) { + exceptionRef.set(ex); + } else { + exceptionRef.get().addSuppressed(ex); + } + } + + private String createRemoteFileName(String dest, boolean useCommonSuffix) { String remoteFilename; if (useCommonSuffix) { remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix; } else { remoteFilename = getNewRemoteSegmentFilename(dest); } + + return remoteFilename; + } + + public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException { + String remoteFilename = createRemoteFileName(dest, useCommonSuffix); remoteDataDirectory.copyFrom(from, src, remoteFilename, context); + postUpload(from, src, remoteFilename); + } + + private void postUpload(Directory from, String src, String remoteFilename) throws IOException { String checksum = getChecksumOfLocalFile(from, src); UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); segmentsUploadedToRemoteStore.put(src, segmentMetadata); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 88fe816ccb462..0e697cf79bd3c 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -15,13 +15,25 @@ import org.opensearch.action.ActionRunnable; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.stream.write.UploadResponse; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.index.translog.ChannelFactory; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; /** * Service that handles remote transfer of translog and checkpoint files @@ -41,18 +53,18 @@ public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) { } @Override - public void uploadBlobAsync( - String threadpoolName, + public void uploadBlobByThreadPool( + String threadPoolName, final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath, - ActionListener listener + ActionListener listener, + WritePriority writePriority ) { assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; - threadPool.executor(threadpoolName).execute(ActionRunnable.wrap(listener, l -> { - try (InputStream inputStream = fileSnapshot.inputStream()) { - blobStore.blobContainer(blobPath) - .writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); + threadPool.executor(threadPoolName).execute(ActionRunnable.wrap(listener, l -> { + try { + uploadBlob(fileSnapshot, blobPath, writePriority); l.onResponse(fileSnapshot); } catch (Exception e) { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); @@ -62,14 +74,101 @@ public void uploadBlobAsync( } @Override - public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { - assert remoteTransferPath instanceof BlobPath; + public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath, WritePriority writePriority) + throws IOException { BlobPath blobPath = (BlobPath) remoteTransferPath; try (InputStream inputStream = fileSnapshot.inputStream()) { blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); + } catch (Exception ex) { + throw ex; + } + } + + @Override + public void uploadBlobs( + Set fileSnapshots, + final Map blobPaths, + ActionListener listener, + WritePriority writePriority + ) throws Exception { + List> resultFutures = new ArrayList<>(); + fileSnapshots.forEach(fileSnapshot -> { + BlobPath blobPath = blobPaths.get(fileSnapshot.getPrimaryTerm()); + if (!blobStore.blobContainer(blobPath).isMultiStreamUploadSupported()) { + uploadBlobByThreadPool(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority); + } else { + CompletableFuture resultFuture = createUploadFuture(fileSnapshot, listener, blobPath, writePriority); + if (resultFuture != null) { + resultFutures.add(resultFuture); + } + } + }); + + if (resultFutures.isEmpty() == false) { + CompletableFuture resultFuture = CompletableFuture.allOf(resultFutures.toArray(new CompletableFuture[0])); + try { + resultFuture.get(); + } catch (Exception e) { + logger.warn("Failed to upload blobs", e); + } } } + private CompletableFuture createUploadFuture( + TransferFileSnapshot fileSnapshot, + ActionListener listener, + BlobPath blobPath, + WritePriority writePriority + ) { + + CompletableFuture resultFuture = null; + try { + ChannelFactory channelFactory = FileChannel::open; + long contentLength; + try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) { + contentLength = channel.size(); + } + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + fileSnapshot.getName(), + fileSnapshot.getName(), + contentLength, + true, + writePriority, + (size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position) + ); + WriteContext writeContext = remoteTransferContainer.createWriteContext(); + CompletableFuture uploadFuture = blobStore.blobContainer(blobPath).writeBlobByStreams(writeContext); + resultFuture = uploadFuture.whenComplete((resp, throwable) -> { + try { + remoteTransferContainer.close(); + } catch (Exception e) { + logger.warn("Error occurred while closing streams", e); + } + if (throwable != null) { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), throwable); + listener.onFailure(new FileTransferException(fileSnapshot, throwable)); + } else if (!resp.isUploadSuccessful()) { + Exception ex = new IOException("Failed to upload blob " + fileSnapshot.getName()); + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); + listener.onFailure(new FileTransferException(fileSnapshot, ex)); + } else { + listener.onResponse(fileSnapshot); + } + }); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); + listener.onFailure(new FileTransferException(fileSnapshot, e)); + } finally { + try { + fileSnapshot.close(); + } catch (IOException e) { + logger.warn("Error while closing TransferFileSnapshot", e); + } + } + + return resultFuture; + } + @Override public InputStream downloadBlob(Iterable path, String fileName) throws IOException { return blobStore.blobContainer((BlobPath) path).readBlob(fileName); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 6aca3055a3f53..18507cd48cbdb 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -9,11 +9,14 @@ package org.opensearch.index.translog.transfer; import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -25,25 +28,40 @@ public interface TransferService { /** * Uploads the {@link TransferFileSnapshot} async, once the upload is complete the callback is invoked - * @param threadpoolName threadpool type which will be used to upload blobs asynchronously + * @param threadPoolName threadpool type which will be used to upload blobs asynchronously * @param fileSnapshot the file snapshot to upload * @param remotePath the remote path where upload should be made * @param listener the callback to be invoked once upload completes successfully/fails */ - void uploadBlobAsync( - String threadpoolName, + void uploadBlobByThreadPool( + String threadPoolName, final TransferFileSnapshot fileSnapshot, Iterable remotePath, - ActionListener listener + ActionListener listener, + WritePriority writePriority ); + /** + * Uploads multiple {@link TransferFileSnapshot}, once the upload is complete the callback is invoked + * @param fileSnapshots the file snapshots to upload + * @param blobPaths Primary term to {@link BlobPath} map + * @param listener the callback to be invoked once uploads complete successfully/fail + */ + void uploadBlobs( + Set fileSnapshots, + final Map blobPaths, + ActionListener listener, + WritePriority writePriority + ) throws Exception; + /** * Uploads the {@link TransferFileSnapshot} blob * @param fileSnapshot the file snapshot to upload * @param remotePath the remote path where upload should be made + * @param writePriority Priority by which content needs to be written. * @throws IOException the exception while transferring the data */ - void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; + void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath, WritePriority writePriority) throws IOException; void deleteBlobs(Iterable path, List fileNames) throws IOException; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 95536317bcc1b..a668ef0bbf45e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -15,6 +15,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; @@ -98,14 +99,16 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans }), latch ); + Map blobPathMap = new HashMap<>(); toUpload.forEach( - fileSnapshot -> transferService.uploadBlobAsync( - ThreadPool.Names.TRANSLOG_TRANSFER, - fileSnapshot, - remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), - latchedActionListener + fileSnapshot -> blobPathMap.put( + fileSnapshot.getPrimaryTerm(), + remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())) ) ); + + transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH); + try { if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) { Exception ex = new TimeoutException("Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"); @@ -118,7 +121,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans throw ex; } if (exceptionList.isEmpty()) { - transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadataTransferPath); + transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadataTransferPath, WritePriority.HIGH); translogTransferListener.onUploadComplete(transferSnapshot); return true; } else { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 49a2d50dfae06..3c5952e26eb96 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -20,6 +20,9 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.junit.Before; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.stream.write.UploadResponse; +import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; @@ -34,6 +37,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; @@ -364,6 +369,96 @@ public void testCopyFrom() throws IOException { storeDirectory.close(); } + public void testCopyFilesFromMultipart() throws Exception { + String filename = "_100.si"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + CompletableFuture uploadResponseCompletableFuture = new CompletableFuture<>(); + uploadResponseCompletableFuture.complete(new UploadResponse(true)); + when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenReturn(uploadResponseCompletableFuture); + + remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT); + + assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + storeDirectory.close(); + } + + public void testCopyFilesFromMultipartIOException() throws Exception { + String filename = "_100.si"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + CompletableFuture uploadResponseCompletableFuture = new CompletableFuture<>(); + uploadResponseCompletableFuture.complete(new UploadResponse(true)); + when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenThrow(new IOException()); + + assertThrows( + IOException.class, + () -> remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT) + ); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + storeDirectory.close(); + } + + public void testCopyFilesFromMultipartUploadFutureCompletedExceptionally() throws Exception { + String filename = "_100.si"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + CompletableFuture uploadResponseCompletableFuture = new CompletableFuture<>(); + uploadResponseCompletableFuture.completeExceptionally(new IOException()); + when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenReturn(uploadResponseCompletableFuture); + + assertThrows( + ExecutionException.class, + () -> remoteSegmentStoreDirectory.copyFilesFrom(storeDirectory, List.of(filename), IOContext.DEFAULT) + ); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + storeDirectory.close(); + } + public void testCopyFromException() throws IOException { String filename = "_100.si"; Directory storeDirectory = LuceneTestCase.newDirectory(); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java new file mode 100644 index 0000000000000..4326d820d9b56 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java @@ -0,0 +1,213 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.stream.write.UploadResponse; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class BlobStoreTransferServiceMockRepositoryTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + + private BlobStore blobStore; + + @Override + public void setUp() throws Exception { + super.setUp(); + blobStore = mock(BlobStore.class); + threadPool = new TestThreadPool(getClass().getName()); + } + + public void testUploadBlobs() throws Exception { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + CompletableFuture uploadResponseCompletableFuture = new CompletableFuture<>(); + uploadResponseCompletableFuture.complete(new UploadResponse(true)); + when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenReturn(uploadResponseCompletableFuture); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + + TransferService transferService = new BlobStoreTransferService(blobStore, threadPool); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + AtomicReference fileSnapshotRef = new AtomicReference<>(); + transferService.uploadBlobs(Collections.singleton(transferFileSnapshot), new HashMap<>() { + { + put(transferFileSnapshot.getPrimaryTerm(), new BlobPath().add("sample_path")); + } + }, new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + onResponseCalled.set(true); + fileSnapshotRef.set(fileSnapshot); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }, latch), WritePriority.HIGH); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + verify(blobContainer).writeBlobByStreams(any(WriteContext.class)); + assertTrue(onResponseCalled.get()); + assertEquals(transferFileSnapshot.getPrimaryTerm(), fileSnapshotRef.get().getPrimaryTerm()); + assertEquals(transferFileSnapshot.getName(), fileSnapshotRef.get().getName()); + assertNull(exceptionRef.get()); + } + + public void testUploadBlobsIOException() throws Exception { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + doThrow(new IOException()).when(blobContainer).writeBlobByStreams(any(WriteContext.class)); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + + TransferService transferService = new BlobStoreTransferService(blobStore, threadPool); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + transferService.uploadBlobs(Collections.singleton(transferFileSnapshot), new HashMap<>() { + { + put(transferFileSnapshot.getPrimaryTerm(), new BlobPath().add("sample_path")); + } + }, new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + onResponseCalled.set(true); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }, latch), WritePriority.HIGH); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + verify(blobContainer).writeBlobByStreams(any(WriteContext.class)); + assertFalse(onResponseCalled.get()); + assertTrue(exceptionRef.get() instanceof FileTransferException); + } + + public void testUploadBlobsUploadFutureCompletedExceptionally() throws Exception { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + CompletableFuture uploadResponseCompletableFuture = new CompletableFuture<>(); + uploadResponseCompletableFuture.completeExceptionally(new IOException()); + when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenReturn(uploadResponseCompletableFuture); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + + TransferService transferService = new BlobStoreTransferService(blobStore, threadPool); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + transferService.uploadBlobs(Collections.singleton(transferFileSnapshot), new HashMap<>() { + { + put(transferFileSnapshot.getPrimaryTerm(), new BlobPath().add("sample_path")); + } + }, new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + onResponseCalled.set(true); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }, latch), WritePriority.HIGH); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + verify(blobContainer).writeBlobByStreams(any(WriteContext.class)); + assertFalse(onResponseCalled.get()); + assertTrue(exceptionRef.get() instanceof FileTransferException); + } + + public void testUploadBlobsUploadUnsuccessful() throws Exception { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + + BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.isMultiStreamUploadSupported()).thenReturn(true); + CompletableFuture uploadResponseCompletableFuture = new CompletableFuture<>(); + uploadResponseCompletableFuture.complete(new UploadResponse(false)); + when(blobContainer.writeBlobByStreams(any(WriteContext.class))).thenReturn(uploadResponseCompletableFuture); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + + TransferService transferService = new BlobStoreTransferService(blobStore, threadPool); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + transferService.uploadBlobs(Collections.singleton(transferFileSnapshot), new HashMap<>() { + { + put(transferFileSnapshot.getPrimaryTerm(), new BlobPath().add("sample_path")); + } + }, new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + onResponseCalled.set(true); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }, latch), WritePriority.HIGH); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + verify(blobContainer).writeBlobByStreams(any(WriteContext.class)); + assertFalse(onResponseCalled.get()); + assertTrue(exceptionRef.get() instanceof FileTransferException); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index 196fbd58c2c20..cc00c49892853 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -12,6 +12,7 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.env.Environment; @@ -51,7 +52,7 @@ public void testUploadBlob() throws IOException { Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); - transferService.uploadBlob(transferFileSnapshot, repository.basePath()); + transferService.uploadBlob(transferFileSnapshot, repository.basePath(), WritePriority.HIGH); } public void testUploadBlobFromByteArray() throws IOException { @@ -61,7 +62,7 @@ public void testUploadBlobFromByteArray() throws IOException { 1 ); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); - transferService.uploadBlob(transferFileSnapshot, repository.basePath()); + transferService.uploadBlob(transferFileSnapshot, repository.basePath(), WritePriority.NORMAL); } public void testUploadBlobAsync() throws IOException, InterruptedException { @@ -71,7 +72,7 @@ public void testUploadBlobAsync() throws IOException, InterruptedException { FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); CountDownLatch latch = new CountDownLatch(1); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); - transferService.uploadBlobAsync( + transferService.uploadBlobByThreadPool( ThreadPool.Names.TRANSLOG_TRANSFER, transferFileSnapshot, repository.basePath(), @@ -87,7 +88,8 @@ public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { public void onFailure(Exception e) { throw new AssertionError("Failed to perform uploadBlobAsync", e); } - }, latch) + }, latch), + WritePriority.HIGH ); assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); assertTrue(succeeded.get()); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 0a21be99bc06b..ad0196367eb8f 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -14,6 +14,7 @@ import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.util.set.Sets; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -35,6 +36,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -71,20 +74,24 @@ public void tearDown() throws Exception { } @SuppressWarnings("unchecked") - public void testTransferSnapshot() throws IOException { + public void testTransferSnapshot() throws Exception { AtomicInteger fileTransferSucceeded = new AtomicInteger(); AtomicInteger fileTransferFailed = new AtomicInteger(); AtomicInteger translogTransferSucceeded = new AtomicInteger(); AtomicInteger translogTransferFailed = new AtomicInteger(); doNothing().when(transferService) - .uploadBlob(any(TransferFileSnapshot.class), Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm)))); + .uploadBlob( + any(TransferFileSnapshot.class), + Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm))), + any(WritePriority.class) + ); doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[3]; - listener.onResponse((TransferFileSnapshot) invocationOnMock.getArguments()[1]); + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + Set transferFileSnapshots = (Set) invocationOnMock.getArguments()[0]; + transferFileSnapshots.forEach(listener::onResponse); return null; - }).when(transferService) - .uploadBlobAsync(any(String.class), any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); FileTransferTracker fileTransferTracker = new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { @Override