From 73453718a1c85565a3f7e4309d6fa83bf9a30522 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat <2025sandeepkumawat@gmail.com> Date: Fri, 12 Apr 2024 11:51:14 +0530 Subject: [PATCH] =?UTF-8?q?Implement=20interface=20changes=20for=20s3=20pl?= =?UTF-8?q?ugin=20to=20read/write=20blob=20with=20obj=E2=80=A6=20(#13113)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Implement interface changes for s3 plugin to read/write blob with object metadata --------- Signed-off-by: Sandeep Kumawat Signed-off-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com> Co-authored-by: Sandeep Kumawat --- .../repositories/s3/S3BlobContainer.java | 60 ++++++++++++++++--- .../s3/S3RetryingInputStream.java | 7 +++ .../s3/async/AsyncTransferManager.java | 9 +++ .../repositories/s3/async/UploadRequest.java | 16 ++++- .../s3/S3BlobContainerMockClientTests.java | 11 +++- .../s3/S3BlobContainerRetriesTests.java | 59 ++++++++++++++++-- .../s3/S3BlobStoreContainerTests.java | 32 +++++++--- .../s3/S3RetryingInputStreamTests.java | 11 ++++ .../s3/async/AsyncTransferManagerTests.java | 25 ++++++-- .../common/blobstore/BlobContainer.java | 13 ++-- ...loadResponse.java => FetchBlobResult.java} | 7 ++- .../blobstore/stream/write/WriteContext.java | 2 +- .../transfer/BlobStoreTransferService.java | 4 +- .../translog/transfer/TransferService.java | 6 +- 14 files changed, 220 insertions(+), 42 deletions(-) rename server/src/main/java/org/opensearch/common/blobstore/{BlobDownloadResponse.java => FetchBlobResult.java} (82%) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 1f23a09a047f2..14829a066ca3a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -62,6 +62,7 @@ import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; +import software.amazon.awssdk.utils.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -77,6 +78,7 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; +import org.opensearch.common.blobstore.FetchBlobResult; import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; @@ -138,6 +140,13 @@ public boolean blobExists(String blobName) { } } + @ExperimentalApi + @Override + public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException { + S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName)); + return new FetchBlobResult(s3RetryingInputStream, s3RetryingInputStream.getMetadata()); + } + @Override public InputStream readBlob(String blobName) throws IOException { return new S3RetryingInputStream(blobStore, buildKey(blobName)); @@ -169,12 +178,27 @@ public long readBlobPreferredLength() { */ @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + writeBlobWithMetadata(blobName, inputStream, blobSize, failIfAlreadyExists, null); + } + + /** + * Write blob with its object metadata. + */ + @ExperimentalApi + @Override + public void writeBlobWithMetadata( + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists, + @Nullable Map metadata + ) throws IOException { assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests"; SocketAccess.doPrivilegedIOException(() -> { if (blobSize <= getLargeBlobThresholdInBytes()) { - executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize); + executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata); } else { - executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize); + executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata); } return null; }); @@ -190,7 +214,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp writeContext.getUploadFinalizer(), writeContext.doRemoteDataIntegrityCheck(), writeContext.getExpectedChecksum(), - blobStore.isUploadRetryEnabled() + blobStore.isUploadRetryEnabled(), + writeContext.getMetadata() ); try { if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) { @@ -203,7 +228,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp blobStore, uploadRequest.getKey(), inputStream.getInputStream(), - uploadRequest.getContentLength() + uploadRequest.getContentLength(), + uploadRequest.getMetadata() ); completionListener.onResponse(null); } catch (Exception ex) { @@ -542,8 +568,13 @@ private String buildKey(String blobName) { /** * Uploads a blob using a single upload request */ - void executeSingleUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize) - throws IOException { + void executeSingleUpload( + final S3BlobStore blobStore, + final String blobName, + final InputStream input, + final long blobSize, + final Map metadata + ) throws IOException { // Extra safety checks if (blobSize > MAX_FILE_SIZE.getBytes()) { @@ -560,6 +591,10 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin .storageClass(blobStore.getStorageClass()) .acl(blobStore.getCannedACL()) .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher)); + + if (CollectionUtils.isNotEmpty(metadata)) { + putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata); + } if (blobStore.serverSideEncryption()) { putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256); } @@ -583,8 +618,13 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin /** * Uploads a blob using multipart upload requests. */ - void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize) - throws IOException { + void executeMultipartUpload( + final S3BlobStore blobStore, + final String blobName, + final InputStream input, + final long blobSize, + final Map metadata + ) throws IOException { ensureMultiPartUploadSize(blobSize); final long partSize = blobStore.bufferSizeInBytes(); @@ -609,6 +649,10 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, .acl(blobStore.getCannedACL()) .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector)); + if (CollectionUtils.isNotEmpty(metadata)) { + createMultipartUploadRequestBuilder.metadata(metadata); + } + if (blobStore.serverSideEncryption()) { createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java index d7e47e0ab1bcc..2eb63178b19e0 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java @@ -48,6 +48,7 @@ import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -77,6 +78,7 @@ class S3RetryingInputStream extends InputStream { private long currentOffset; private boolean closed; private boolean eof; + private Map metadata; S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException { this(blobStore, blobKey, 0, Long.MAX_VALUE - 1); @@ -122,6 +124,7 @@ private void openStream() throws IOException { getObjectResponseInputStream.response().contentLength() ); this.currentStream = getObjectResponseInputStream; + this.metadata = getObjectResponseInputStream.response().metadata(); this.isStreamAborted.set(false); } catch (final SdkException e) { if (e instanceof S3Exception) { @@ -265,4 +268,8 @@ boolean isEof() { boolean isAborted() { return isStreamAborted.get(); } + + Map getMetadata() { + return this.metadata; + } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 2259780c95276..80538059d17b8 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -22,6 +22,7 @@ import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CompletableFutureUtils; import org.apache.logging.log4j.LogManager; @@ -131,6 +132,10 @@ private void uploadInParts( .bucket(uploadRequest.getBucket()) .key(uploadRequest.getKey()) .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)); + + if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) { + createMultipartUploadRequestBuilder.metadata(uploadRequest.getMetadata()); + } if (uploadRequest.doRemoteDataIntegrityCheck()) { createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); } @@ -327,6 +332,10 @@ private void uploadInOneChunk( .key(uploadRequest.getKey()) .contentLength(uploadRequest.getContentLength()) .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher)); + + if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) { + putObjectRequestBuilder.metadata(uploadRequest.getMetadata()); + } if (uploadRequest.doRemoteDataIntegrityCheck()) { putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum())); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java index a5304dc4a97d6..b944a72225d36 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java @@ -9,9 +9,11 @@ package org.opensearch.repositories.s3.async; import org.opensearch.common.CheckedConsumer; +import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.stream.write.WritePriority; import java.io.IOException; +import java.util.Map; /** * A model encapsulating all details for an upload to S3 @@ -24,8 +26,8 @@ public class UploadRequest { private final CheckedConsumer uploadFinalizer; private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; - private boolean uploadRetryEnabled; + private final Map metadata; /** * Construct a new UploadRequest object @@ -37,6 +39,7 @@ public class UploadRequest { * @param uploadFinalizer An upload finalizer to call once all parts are uploaded * @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done * @param expectedChecksum Checksum of the file being uploaded for remote data integrity check + * @param metadata Metadata of the file being uploaded */ public UploadRequest( String bucket, @@ -46,7 +49,8 @@ public UploadRequest( CheckedConsumer uploadFinalizer, boolean doRemoteDataIntegrityCheck, Long expectedChecksum, - boolean uploadRetryEnabled + boolean uploadRetryEnabled, + @Nullable Map metadata ) { this.bucket = bucket; this.key = key; @@ -56,6 +60,7 @@ public UploadRequest( this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; this.expectedChecksum = expectedChecksum; this.uploadRetryEnabled = uploadRetryEnabled; + this.metadata = metadata; } public String getBucket() { @@ -89,4 +94,11 @@ public Long getExpectedChecksum() { public boolean isUploadRetryEnabled() { return uploadRetryEnabled; } + + /** + * @return metadata of the blob to be uploaded + */ + public Map getMetadata() { + return metadata; + } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 9e830c409a58b..4173f8b66387f 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -57,6 +57,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.concurrent.CompletableFuture; @@ -75,6 +76,7 @@ import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; @@ -659,6 +661,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t .writePriority(WritePriority.HIGH) .uploadFinalizer(Assert::assertTrue) .doRemoteDataIntegrityCheck(false) + .metadata(new HashMap<>()) .build(); s3BlobContainer.asyncBlobUpload(writeContext, completionListener); @@ -668,7 +671,13 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t } else { assertNull(exceptionRef.get()); } - verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong()); + verify(s3BlobContainer, times(1)).executeMultipartUpload( + any(S3BlobStore.class), + anyString(), + any(InputStream.class), + anyLong(), + anyMap() + ); if (expectException) { verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index 8e25ba4d950ef..10578090da75c 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -79,8 +79,10 @@ import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -240,7 +242,7 @@ public InputStream readBlob(String blobName, long position, long length) throws }; } - public void testWriteBlobWithRetries() throws Exception { + public void writeBlobWithRetriesHelper(Map metadata) throws Exception { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); @@ -280,11 +282,26 @@ public void testWriteBlobWithRetries() throws Exception { final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null); try (InputStream stream = new ByteArrayInputStream(bytes)) { - blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false); + if (metadata != null) { + blobContainer.writeBlobWithMetadata("write_blob_max_retries", stream, bytes.length, false, metadata); + } else { + blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false); + } } assertThat(countDown.isCountedDown(), is(true)); } + public void testWriteBlobWithMetadataWithRetries() throws Exception { + Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + writeBlobWithRetriesHelper(metadata); + } + + public void testWriteBlobWithRetries() throws Exception { + writeBlobWithRetriesHelper(null); + } + public void testWriteBlobByStreamsWithRetries() throws Exception { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); @@ -368,7 +385,7 @@ private int calculateNumberOfParts(long contentLength, long partSize) { return (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); } - public void testWriteBlobWithReadTimeouts() { + public void writeBlobWithReadTimeoutsHelper(Map metadata) { final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null); @@ -386,7 +403,11 @@ public void testWriteBlobWithReadTimeouts() { Exception exception = expectThrows(IOException.class, () -> { try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) { - blobContainer.writeBlob("write_blob_timeout", stream, bytes.length, false); + if (metadata != null) { + blobContainer.writeBlobWithMetadata("write_blob_timeout", stream, bytes.length, false, metadata); + } else { + blobContainer.writeBlob("write_blob_timeout", stream, bytes.length, false); + } } }); assertThat( @@ -401,7 +422,18 @@ public void testWriteBlobWithReadTimeouts() { assertThat(exception.getCause().getCause().getMessage().toLowerCase(Locale.ROOT), containsString("read timed out")); } - public void testWriteLargeBlob() throws Exception { + public void testWriteBlobWithMetadataWithReadTimeouts() throws Exception { + Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + writeBlobWithReadTimeoutsHelper(metadata); + } + + public void testWriteBlobWithReadTimeouts() throws Exception { + writeBlobWithReadTimeoutsHelper(null); + } + + public void WriteLargeBlobHelper(Map metadata) throws Exception { final boolean useTimeout = rarely(); final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null; final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB); @@ -487,13 +519,28 @@ public void testWriteLargeBlob() throws Exception { } }); - blobContainer.writeBlob("write_large_blob", new ZeroInputStream(blobSize), blobSize, false); + if (metadata != null) { + blobContainer.writeBlobWithMetadata("write_large_blob", new ZeroInputStream(blobSize), blobSize, false, metadata); + } else { + blobContainer.writeBlob("write_large_blob", new ZeroInputStream(blobSize), blobSize, false); + } assertThat(countDownInitiate.isCountedDown(), is(true)); assertThat(countDownUploads.get(), equalTo(0)); assertThat(countDownComplete.isCountedDown(), is(true)); } + public void testWriteLargeBlobWithMetadata() throws Exception { + Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + WriteLargeBlobHelper(metadata); + } + + public void testWriteLargeBlob() throws Exception { + WriteLargeBlobHelper(null); + } + /** * Asserts that an InputStream is fully consumed, or aborted, when it is closed */ diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index 2b45e9cfe2d4b..654d8a72690c4 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -90,6 +90,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -125,7 +126,7 @@ public void testExecuteSingleUploadBlobSizeTooLarge() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> blobContainer.executeSingleUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize) + () -> blobContainer.executeSingleUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null) ); assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage()); } @@ -139,7 +140,13 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> blobContainer.executeSingleUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), ByteSizeUnit.MB.toBytes(2)) + () -> blobContainer.executeSingleUpload( + blobStore, + blobName, + new ByteArrayInputStream(new byte[0]), + ByteSizeUnit.MB.toBytes(2), + null + ) ); assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage()); } @@ -430,6 +437,10 @@ public void testExecuteSingleUpload() throws IOException { final String bucketName = randomAlphaOfLengthBetween(1, 10); final String blobName = randomAlphaOfLengthBetween(1, 10); + final Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + final BlobPath blobPath = new BlobPath(); if (randomBoolean()) { IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); @@ -467,7 +478,7 @@ public void testExecuteSingleUpload() throws IOException { ); final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]); - blobContainer.executeSingleUpload(blobStore, blobName, inputStream, blobSize); + blobContainer.executeSingleUpload(blobStore, blobName, inputStream, blobSize, metadata); final PutObjectRequest request = putObjectRequestArgumentCaptor.getValue(); final RequestBody requestBody = requestBodyArgumentCaptor.getValue(); @@ -480,6 +491,7 @@ public void testExecuteSingleUpload() throws IOException { assertEquals(blobSize, request.contentLength().longValue()); assertEquals(storageClass, request.storageClass()); assertEquals(cannedAccessControlList, request.acl()); + assertEquals(metadata, request.metadata()); if (serverSideEncryption) { assertEquals(ServerSideEncryption.AES256, request.serverSideEncryption()); } @@ -492,7 +504,7 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize) + () -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null) ); assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage()); } @@ -504,7 +516,7 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize) + () -> blobContainer.executeMultipartUpload(blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize, null) ); assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage()); } @@ -513,6 +525,10 @@ public void testExecuteMultipartUpload() throws IOException { final String bucketName = randomAlphaOfLengthBetween(1, 10); final String blobName = randomAlphaOfLengthBetween(1, 10); + final Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + final BlobPath blobPath = new BlobPath(); if (randomBoolean()) { IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); @@ -577,13 +593,15 @@ public void testExecuteMultipartUpload() throws IOException { final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]); final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.executeMultipartUpload(blobStore, blobName, inputStream, blobSize); + blobContainer.executeMultipartUpload(blobStore, blobName, inputStream, blobSize, metadata); final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestArgumentCaptor.getValue(); assertEquals(bucketName, initRequest.bucket()); assertEquals(blobPath.buildAsString() + blobName, initRequest.key()); assertEquals(storageClass, initRequest.storageClass()); assertEquals(cannedAccessControlList, initRequest.acl()); + assertEquals(metadata, initRequest.metadata()); + if (serverSideEncryption) { assertEquals(ServerSideEncryption.AES256, initRequest.serverSideEncryption()); } @@ -686,7 +704,7 @@ public void testExecuteMultipartUploadAborted() { final IOException e = expectThrows(IOException.class, () -> { final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); - blobContainer.executeMultipartUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize); + blobContainer.executeMultipartUpload(blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize, null); }); assertEquals("Unable to upload object [" + blobName + "] using multipart upload", e.getMessage()); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RetryingInputStreamTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RetryingInputStreamTests.java index b38d5119b4108..d884a46f3ecc5 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RetryingInputStreamTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RetryingInputStreamTests.java @@ -43,6 +43,8 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.any; @@ -61,6 +63,15 @@ public void testInputStreamFullyConsumed() throws IOException { assertThat(stream.isAborted(), is(false)); } + public void testInputStreamGetMetadata() throws IOException { + final byte[] expectedBytes = randomByteArrayOfLength(randomIntBetween(1, 512)); + + final S3RetryingInputStream stream = createInputStream(expectedBytes, 0L, (long) (Integer.MAX_VALUE - 1)); + + Map metadata = new HashMap<>(); + assertEquals(stream.getMetadata(), metadata); + } + public void testInputStreamIsAborted() throws IOException { final byte[] expectedBytes = randomByteArrayOfLength(randomIntBetween(10, 512)); final byte[] actualBytes = new byte[randomIntBetween(1, Math.max(1, expectedBytes.length - 1))]; diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index b753b847df869..04d1819bef02b 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -40,7 +40,9 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -78,11 +80,14 @@ public void testOneChunkUpload() { ); AtomicReference streamRef = new AtomicReference<>(); + Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); CompletableFuture resultFuture = asyncTransferManager.uploadObject( s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, false, null, true), + }, false, null, true, metadata), new StreamContext((partIdx, partSize, position) -> { streamRef.set(new ZeroInputStream(partSize)); return new InputStreamContainer(streamRef.get(), partSize, position); @@ -123,11 +128,15 @@ public void testOneChunkUploadCorruption() { deleteObjectResponseCompletableFuture.complete(DeleteObjectResponse.builder().build()); when(s3AsyncClient.deleteObject(any(DeleteObjectRequest.class))).thenReturn(deleteObjectResponseCompletableFuture); + Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + CompletableFuture resultFuture = asyncTransferManager.uploadObject( s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, false, null, true), + }, false, null, true, metadata), new StreamContext( (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), ByteSizeUnit.MB.toBytes(1), @@ -175,12 +184,16 @@ public void testMultipartUpload() { abortMultipartUploadResponseCompletableFuture ); + Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + List streams = new ArrayList<>(); CompletableFuture resultFuture = asyncTransferManager.uploadObject( s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, true, 3376132981L, true), + }, true, 3376132981L, true, metadata), new StreamContext((partIdx, partSize, position) -> { InputStream stream = new ZeroInputStream(partSize); streams.add(stream); @@ -236,11 +249,15 @@ public void testMultipartUploadCorruption() { abortMultipartUploadResponseCompletableFuture ); + Map metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + CompletableFuture resultFuture = asyncTransferManager.uploadObject( s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, true, 0L, true), + }, true, 0L, true, metadata), new StreamContext( (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), ByteSizeUnit.MB.toBytes(1), diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java index e16e75de7f27d..4f5f8d4b1ef5f 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java @@ -32,6 +32,7 @@ package org.opensearch.common.blobstore; +import org.opensearch.common.Nullable; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.action.ActionListener; @@ -79,16 +80,16 @@ public interface BlobContainer { InputStream readBlob(String blobName) throws IOException; /** - * Creates a new {@link BlobDownloadResponse} for the given blob name. + * Creates a new {@link FetchBlobResult} for the given blob name. * * @param blobName * The name of the blob to get an {@link InputStream} for. - * @return The {@link BlobDownloadResponse} of the blob. + * @return The {@link FetchBlobResult} of the blob. * @throws NoSuchFileException if the blob does not exist * @throws IOException if the blob can not be read. */ @ExperimentalApi - default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException { + default FetchBlobResult readBlobWithMetadata(String blobName) throws IOException { throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet"); }; @@ -166,9 +167,9 @@ default long readBlobPreferredLength() { default void writeBlobWithMetadata( String blobName, InputStream inputStream, - Map metadata, long blobSize, - boolean failIfAlreadyExists + boolean failIfAlreadyExists, + @Nullable Map metadata ) throws IOException { throw new UnsupportedOperationException("writeBlobWithMetadata is not implemented yet"); }; @@ -219,7 +220,7 @@ default void writeBlobWithMetadata( default void writeBlobAtomicWithMetadata( String blobName, InputStream inputStream, - Map metadata, + @Nullable Map metadata, long blobSize, boolean failIfAlreadyExists ) throws IOException { diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java b/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java similarity index 82% rename from server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java rename to server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java index 97f3e4a16a76c..55aca771b586c 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java +++ b/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java @@ -8,6 +8,8 @@ package org.opensearch.common.blobstore; +import org.opensearch.common.annotation.ExperimentalApi; + import java.io.InputStream; import java.util.Map; @@ -17,7 +19,8 @@ * * @opensearch.experimental */ -public class BlobDownloadResponse { +@ExperimentalApi +public class FetchBlobResult { /** * Downloaded blob InputStream @@ -37,7 +40,7 @@ public Map getMetadata() { return metadata; } - public BlobDownloadResponse(InputStream inputStream, Map metadata) { + public FetchBlobResult(InputStream inputStream, Map metadata) { this.inputStream = inputStream; this.metadata = metadata; } diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java index d14800e82e495..9a7fc24f7e484 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -52,7 +52,7 @@ private WriteContext( CheckedConsumer uploadFinalizer, boolean doRemoteDataIntegrityCheck, @Nullable Long expectedChecksum, - Map metadata + @Nullable Map metadata ) { this.fileName = fileName; this.streamContextSupplier = streamContextSupplier; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index a1d5041ff9aff..bec2d78d9af62 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -15,10 +15,10 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; -import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.FetchBlobResult; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; @@ -168,7 +168,7 @@ public InputStream downloadBlob(Iterable path, String fileName) throws I @Override @ExperimentalApi - public BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { + public FetchBlobResult downloadBlobWithMetadata(Iterable path, String fileName) throws IOException { return blobStore.blobContainer((BlobPath) path).readBlobWithMetadata(fileName); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 5fe8b02058383..0894ebf500ebd 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -9,9 +9,9 @@ package org.opensearch.index.translog.transfer; import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.common.blobstore.BlobDownloadResponse; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.FetchBlobResult; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.core.action.ActionListener; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; @@ -131,11 +131,11 @@ void uploadBlobs( * * @param path the remote path from where download should be made * @param fileName the name of the file - * @return {@link BlobDownloadResponse} of the remote file + * @return {@link FetchBlobResult} of the remote file * @throws IOException the exception while reading the data */ @ExperimentalApi - BlobDownloadResponse downloadBlobWithMetadata(Iterable path, String fileName) throws IOException; + FetchBlobResult downloadBlobWithMetadata(Iterable path, String fileName) throws IOException; void listAllInSortedOrder(Iterable path, String filenamePrefix, int limit, ActionListener> listener);