Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ufal/s3-check-etag #537

Merged
merged 6 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,22 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.Upload;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.dspace.content.Bitstream;
Expand All @@ -38,6 +48,16 @@ public class SyncS3BitStoreService extends S3BitStoreService {
private static final Logger log = LogManager.getLogger(SyncS3BitStoreService.class);
private boolean syncEnabled = false;

/**
* The uploading file is divided into parts and each part is uploaded separately. The size of the part is 50 MB.
*/
private static final long UPLOAD_FILE_PART_SIZE = 50 * 1024 * 1024; // 50 MB

/**
* Upload large file by parts - check the checksum of every part
*/
private boolean uploadByParts = false;

@Autowired(required = true)
DSBitStoreService dsBitStoreService;

Expand All @@ -48,9 +68,29 @@ public SyncS3BitStoreService() {
super();
}

/**
* Define syncEnabled and uploadByParts in the constructor - this values won't be overridden by the configuration
*
* @param syncEnabled if true, the file will be uploaded to the local assetstore
* @param uploadByParts if true, the file will be uploaded by parts
*/
public SyncS3BitStoreService(boolean syncEnabled, boolean uploadByParts) {
super();
this.syncEnabled = syncEnabled;
this.uploadByParts = uploadByParts;
}

public void init() throws IOException {
super.init();
syncEnabled = configurationService.getBooleanProperty("sync.storage.service.enabled", false);

// The syncEnabled and uploadByParts could be set to true in the constructor,
// do not override them by the configuration in this case
if (!syncEnabled) {
syncEnabled = configurationService.getBooleanProperty("sync.storage.service.enabled", false);
}
if (!uploadByParts) {
uploadByParts = configurationService.getBooleanProperty("s3.upload.by.parts.enabled", false);
}
}

@Override
Expand All @@ -66,9 +106,11 @@ public void put(Bitstream bitstream, InputStream in) throws IOException {
Utils.bufferedCopy(dis, fos);
in.close();

Upload upload = tm.upload(getBucketName(), key, scratchFile);

upload.waitForUploadResult();
if (uploadByParts) {
uploadByParts(key, scratchFile);
} else {
uploadFluently(key, scratchFile);
}

bitstream.setSizeBytes(scratchFile.length());
// we cannot use the S3 ETAG here as it could be not a MD5 in case of multipart upload (large files) or if
Expand Down Expand Up @@ -119,6 +161,7 @@ public void remove(Bitstream bitstream) throws IOException {

/**
* Create a new file in the assetstore if it does not exist
*
* @param localFile
* @throws IOException
*/
Expand All @@ -137,4 +180,116 @@ private void createFileIfNotExist(File localFile) throws IOException {
" was not created");
}
}

/**
* Upload a file fluently. The file is uploaded in a single request.
*
* @param key the bitstream's internalId
* @param scratchFile the file to upload
* @throws InterruptedException if the S3 upload is interrupted
*/
private void uploadFluently(String key, File scratchFile) throws InterruptedException {
Upload upload = tm.upload(getBucketName(), key, scratchFile);

upload.waitForUploadResult();
}

/**
* Upload a file by parts. The file is divided into parts and each part is uploaded separately.
* The checksum of each part is checked. If the checksum does not match, the file is not uploaded.
*
* @param key the bitstream's internalId
* @param scratchFile the file to upload
* @throws IOException if an I/O error occurs
*/
private void uploadByParts(String key, File scratchFile) throws IOException {
// Initialize MessageDigest for computing checksum
MessageDigest digest;
try {
digest = MessageDigest.getInstance("MD5");
} catch (Exception e) {
throw new RuntimeException("MD5 algorithm not available", e);
}

// Initiate multipart upload
InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(getBucketName(), key);
String uploadId = this.s3Service.initiateMultipartUpload(initiateRequest).getUploadId();

// Create a list to hold the ETags for individual parts
List<PartETag> partETags = new ArrayList<>();

try {
// Upload parts
File file = new File(scratchFile.getPath());
long fileLength = file.length();
long remainingBytes = fileLength;
int partNumber = 1;

while (remainingBytes > 0) {
long bytesToUpload = Math.min(UPLOAD_FILE_PART_SIZE, remainingBytes);

// Calculate the checksum for the part
String partChecksum = calculatePartChecksum(file, fileLength - remainingBytes, bytesToUpload, digest);

UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(this.getBucketName())
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withFile(file)
.withFileOffset(fileLength - remainingBytes)
.withPartSize(bytesToUpload);

// Upload the part
UploadPartResult uploadPartResponse = this.s3Service.uploadPart(uploadRequest);

// Collect the ETag for the part
partETags.add(uploadPartResponse.getPartETag());

// Compare checksums - local with ETag
if (!StringUtils.equals(uploadPartResponse.getETag(), partChecksum)) {
String errorMessage = "Checksums do not match error: The locally computed checksum does " +
"not match with the ETag from the UploadPartResult. Local checksum: " + partChecksum +
", ETag: " + uploadPartResponse.getETag() + ", partNumber: " + partNumber;
log.error(errorMessage);
throw new IOException(errorMessage);
}

remainingBytes -= bytesToUpload;
partNumber++;
}

// Complete the multipart upload
CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(this.getBucketName(),
key, uploadId, partETags);
this.s3Service.completeMultipartUpload(completeRequest);
} catch (AmazonClientException e) {
log.error("Cannot upload the file by parts because: ", e);
}
}

/**
* Calculate the checksum of the specified part of the file (Multipart upload)
*
* @param file the uploading file
* @param offset the offset in the file
* @param length the length of the part
* @param digest the message digest for computing the checksum
* @return the checksum of the part
* @throws IOException if an I/O error occurs
*/
public static String calculatePartChecksum(File file, long offset, long length, MessageDigest digest)
throws IOException {
try (FileInputStream fis = new FileInputStream(file);
DigestInputStream dis = new DigestInputStream(fis, digest)) {
// Skip to the specified offset
fis.skip(offset);

// Read the specified length
IOUtils.copyLarge(dis, OutputStream.nullOutputStream(), 0, length);

// Convert the digest to a hex string
return Utils.toHex(digest.digest());
}
}
}
2 changes: 2 additions & 0 deletions dspace/config/clarin-dspace.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ file.preview.enabled = false
### Storage service ###
# Synchronization is NOT enabled by default
sync.storage.service.enabled = true
# Upload large file by parts - check the checksum of every part
s3.upload.by.parts.enabled = true


### The build version is stored in the specific file ###
Expand Down
Loading