Skip to content

Commit

Permalink
Single Commit for Translog Optimisation
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed May 27, 2024
1 parent 215ffc8 commit d43f00b
Show file tree
Hide file tree
Showing 71 changed files with 2,007 additions and 482 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
- [Remote Store] Upload translog checkpoint as object metadata to translog.tlog([#13637](https://github.com/opensearch-project/OpenSearch/pull/13637))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.utils.CollectionUtils;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -77,6 +78,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.InputStreamWithMetadata;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -139,6 +141,13 @@ public boolean blobExists(String blobName) {
}
}

@ExperimentalApi
@Override
public InputStreamWithMetadata readBlobWithMetadata(String blobName) throws IOException {
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
return new InputStreamWithMetadata(s3RetryingInputStream, s3RetryingInputStream.getMetadata());
}

@Override
public InputStream readBlob(String blobName) throws IOException {
return new S3RetryingInputStream(blobStore, buildKey(blobName));
Expand Down Expand Up @@ -170,12 +179,27 @@ public long readBlobPreferredLength() {
*/
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlobWithMetadata(blobName, inputStream, blobSize, failIfAlreadyExists, null);
}

/**
* Write blob with its object metadata.
*/
@ExperimentalApi
@Override
public void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists,
@Nullable Map<String, String> metadata
) throws IOException {
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
} else {
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
}
return null;
});
Expand All @@ -191,7 +215,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
writeContext.getUploadFinalizer(),
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum(),
blobStore.isUploadRetryEnabled()
blobStore.isUploadRetryEnabled(),
writeContext.getMetadata()
);
try {
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
Expand All @@ -211,7 +236,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
blobStore,
uploadRequest.getKey(),
inputStream.getInputStream(),
uploadRequest.getContentLength()
uploadRequest.getContentLength(),
uploadRequest.getMetadata()
);
completionListener.onResponse(null);
} catch (Exception ex) {
Expand Down Expand Up @@ -325,7 +351,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
);
}
}
listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum));
listener.onResponse(new ReadContext.Builder(blobSize, blobPartInputStreamFutures).blobChecksum(blobChecksum).build());
} catch (Exception ex) {
listener.onFailure(ex);
}
Expand Down Expand Up @@ -582,8 +608,13 @@ private String buildKey(String blobName) {
/**
* Uploads a blob using a single upload request
*/
void executeSingleUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeSingleUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

// Extra safety checks
if (blobSize > MAX_FILE_SIZE.getBytes()) {
Expand All @@ -600,6 +631,10 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));

if (CollectionUtils.isNotEmpty(metadata)) {
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
}
if (blobStore.serverSideEncryption()) {
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand All @@ -623,8 +658,13 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
/**
* Uploads a blob using multipart upload requests.
*/
void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeMultipartUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

ensureMultiPartUploadSize(blobSize);
final long partSize = blobStore.bufferSizeInBytes();
Expand All @@ -649,6 +689,10 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));

if (CollectionUtils.isNotEmpty(metadata)) {
createMultipartUploadRequestBuilder.metadata(metadata);
}

if (blobStore.serverSideEncryption()) {
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ public Map<Metric, Map<String, Long>> extendedStats() {
return extendedStats;
}

@Override
public boolean isBlobMetadataEnabled() {
return true;
}

public ObjectCannedACL getCannedACL() {
return cannedACL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -77,6 +78,7 @@ class S3RetryingInputStream extends InputStream {
private long currentOffset;
private boolean closed;
private boolean eof;
private Map<String, String> metadata;

S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
this(blobStore, blobKey, 0, Long.MAX_VALUE - 1);
Expand Down Expand Up @@ -122,6 +124,7 @@ private void openStream() throws IOException {
getObjectResponseInputStream.response().contentLength()
);
this.currentStream = getObjectResponseInputStream;
this.metadata = getObjectResponseInputStream.response().metadata();
this.isStreamAborted.set(false);
} catch (final SdkException e) {
if (e instanceof S3Exception) {
Expand Down Expand Up @@ -265,4 +268,8 @@ boolean isEof() {
boolean isAborted() {
return isStreamAborted.get();
}

Map<String, String> getMetadata() {
return this.metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.CompletableFutureUtils;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -154,6 +155,10 @@ private void uploadInParts(
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector));

if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
createMultipartUploadRequestBuilder.metadata(uploadRequest.getMetadata());
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
}
Expand Down Expand Up @@ -352,6 +357,10 @@ private void uploadInOneChunk(
.key(uploadRequest.getKey())
.contentLength(uploadRequest.getContentLength())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher));

if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
putObjectRequestBuilder.metadata(uploadRequest.getMetadata());
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package org.opensearch.repositories.s3.async;

import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.stream.write.WritePriority;

import java.io.IOException;
import java.util.Map;

/**
* A model encapsulating all details for an upload to S3
Expand All @@ -25,6 +27,7 @@ public class UploadRequest {
private final boolean doRemoteDataIntegrityCheck;
private final Long expectedChecksum;
private final boolean uploadRetryEnabled;
private final Map<String, String> metadata;

/**
* Construct a new UploadRequest object
Expand All @@ -36,6 +39,7 @@ public class UploadRequest {
* @param uploadFinalizer An upload finalizer to call once all parts are uploaded
* @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done
* @param expectedChecksum Checksum of the file being uploaded for remote data integrity check
* @param metadata Metadata of the file being uploaded
*/
public UploadRequest(
String bucket,
Expand All @@ -45,7 +49,8 @@ public UploadRequest(
CheckedConsumer<Boolean, IOException> uploadFinalizer,
boolean doRemoteDataIntegrityCheck,
Long expectedChecksum,
boolean uploadRetryEnabled
boolean uploadRetryEnabled,
@Nullable Map<String, String> metadata
) {
this.bucket = bucket;
this.key = key;
Expand All @@ -55,6 +60,7 @@ public UploadRequest(
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
this.expectedChecksum = expectedChecksum;
this.uploadRetryEnabled = uploadRetryEnabled;
this.metadata = metadata;
}

public String getBucket() {
Expand Down Expand Up @@ -88,4 +94,11 @@ public Long getExpectedChecksum() {
public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

/**
* @return metadata of the blob to be uploaded
*/
public Map<String, String> getMetadata() {
return metadata;
}
}
Loading

0 comments on commit d43f00b

Please sign in to comment.