Skip to content

Commit

Permalink
ufal/s3-check-etag (#537)
Browse files Browse the repository at this point in the history
* Temp commit - upload a file by uploading 50MB parts

* Added uploading the file by parts - for every part is computed checksum and compared with UploadPartResult's ETag. This feature could be enabled or disabled by cfg.

* Undo S3BitStoreService changes

* Fixed checkstyle issues

* Prettify the code

* Changed cfg property to be better understandable and file size is converted into constant.
  • Loading branch information
milanmajchrak committed Jun 19, 2024
1 parent d3dc661 commit 1c03d77
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,22 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.Upload;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.dspace.content.Bitstream;
Expand All @@ -38,6 +48,16 @@ public class SyncS3BitStoreService extends S3BitStoreService {
private static final Logger log = LogManager.getLogger(SyncS3BitStoreService.class);
private boolean syncEnabled = false;

/**
* The uploading file is divided into parts and each part is uploaded separately. The size of the part is 50 MB.
*/
private static final long UPLOAD_FILE_PART_SIZE = 50 * 1024 * 1024; // 50 MB

/**
* Upload large file by parts - check the checksum of every part
*/
private boolean uploadByParts = false;

@Autowired(required = true)
DSBitStoreService dsBitStoreService;

Expand All @@ -48,9 +68,29 @@ public SyncS3BitStoreService() {
super();
}

/**
* Define syncEnabled and uploadByParts in the constructor - this values won't be overridden by the configuration
*
* @param syncEnabled if true, the file will be uploaded to the local assetstore
* @param uploadByParts if true, the file will be uploaded by parts
*/
public SyncS3BitStoreService(boolean syncEnabled, boolean uploadByParts) {
super();
this.syncEnabled = syncEnabled;
this.uploadByParts = uploadByParts;
}

public void init() throws IOException {
super.init();
syncEnabled = configurationService.getBooleanProperty("sync.storage.service.enabled", false);

// The syncEnabled and uploadByParts could be set to true in the constructor,
// do not override them by the configuration in this case
if (!syncEnabled) {
syncEnabled = configurationService.getBooleanProperty("sync.storage.service.enabled", false);
}
if (!uploadByParts) {
uploadByParts = configurationService.getBooleanProperty("s3.upload.by.parts.enabled", false);
}
}

@Override
Expand All @@ -66,9 +106,11 @@ public void put(Bitstream bitstream, InputStream in) throws IOException {
Utils.bufferedCopy(dis, fos);
in.close();

Upload upload = tm.upload(getBucketName(), key, scratchFile);

upload.waitForUploadResult();
if (uploadByParts) {
uploadByParts(key, scratchFile);
} else {
uploadFluently(key, scratchFile);
}

bitstream.setSizeBytes(scratchFile.length());
// we cannot use the S3 ETAG here as it could be not a MD5 in case of multipart upload (large files) or if
Expand Down Expand Up @@ -119,6 +161,7 @@ public void remove(Bitstream bitstream) throws IOException {

/**
* Create a new file in the assetstore if it does not exist
*
* @param localFile
* @throws IOException
*/
Expand All @@ -137,4 +180,116 @@ private void createFileIfNotExist(File localFile) throws IOException {
" was not created");
}
}

/**
* Upload a file fluently. The file is uploaded in a single request.
*
* @param key the bitstream's internalId
* @param scratchFile the file to upload
* @throws InterruptedException if the S3 upload is interrupted
*/
private void uploadFluently(String key, File scratchFile) throws InterruptedException {
Upload upload = tm.upload(getBucketName(), key, scratchFile);

upload.waitForUploadResult();
}

/**
* Upload a file by parts. The file is divided into parts and each part is uploaded separately.
* The checksum of each part is checked. If the checksum does not match, the file is not uploaded.
*
* @param key the bitstream's internalId
* @param scratchFile the file to upload
* @throws IOException if an I/O error occurs
*/
private void uploadByParts(String key, File scratchFile) throws IOException {
// Initialize MessageDigest for computing checksum
MessageDigest digest;
try {
digest = MessageDigest.getInstance("MD5");
} catch (Exception e) {
throw new RuntimeException("MD5 algorithm not available", e);
}

// Initiate multipart upload
InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(getBucketName(), key);
String uploadId = this.s3Service.initiateMultipartUpload(initiateRequest).getUploadId();

// Create a list to hold the ETags for individual parts
List<PartETag> partETags = new ArrayList<>();

try {
// Upload parts
File file = new File(scratchFile.getPath());
long fileLength = file.length();
long remainingBytes = fileLength;
int partNumber = 1;

while (remainingBytes > 0) {
long bytesToUpload = Math.min(UPLOAD_FILE_PART_SIZE, remainingBytes);

// Calculate the checksum for the part
String partChecksum = calculatePartChecksum(file, fileLength - remainingBytes, bytesToUpload, digest);

UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(this.getBucketName())
.withKey(key)
.withUploadId(uploadId)
.withPartNumber(partNumber)
.withFile(file)
.withFileOffset(fileLength - remainingBytes)
.withPartSize(bytesToUpload);

// Upload the part
UploadPartResult uploadPartResponse = this.s3Service.uploadPart(uploadRequest);

// Collect the ETag for the part
partETags.add(uploadPartResponse.getPartETag());

// Compare checksums - local with ETag
if (!StringUtils.equals(uploadPartResponse.getETag(), partChecksum)) {
String errorMessage = "Checksums do not match error: The locally computed checksum does " +
"not match with the ETag from the UploadPartResult. Local checksum: " + partChecksum +
", ETag: " + uploadPartResponse.getETag() + ", partNumber: " + partNumber;
log.error(errorMessage);
throw new IOException(errorMessage);
}

remainingBytes -= bytesToUpload;
partNumber++;
}

// Complete the multipart upload
CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(this.getBucketName(),
key, uploadId, partETags);
this.s3Service.completeMultipartUpload(completeRequest);
} catch (AmazonClientException e) {
log.error("Cannot upload the file by parts because: ", e);
}
}

/**
* Calculate the checksum of the specified part of the file (Multipart upload)
*
* @param file the uploading file
* @param offset the offset in the file
* @param length the length of the part
* @param digest the message digest for computing the checksum
* @return the checksum of the part
* @throws IOException if an I/O error occurs
*/
public static String calculatePartChecksum(File file, long offset, long length, MessageDigest digest)
throws IOException {
try (FileInputStream fis = new FileInputStream(file);
DigestInputStream dis = new DigestInputStream(fis, digest)) {
// Skip to the specified offset
fis.skip(offset);

// Read the specified length
IOUtils.copyLarge(dis, OutputStream.nullOutputStream(), 0, length);

// Convert the digest to a hex string
return Utils.toHex(digest.digest());
}
}
}
2 changes: 2 additions & 0 deletions dspace/config/clarin-dspace.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ file.preview.enabled = false
### Storage service ###
# Synchronization is NOT enabled by default
sync.storage.service.enabled = true
# Upload large file by parts - check the checksum of every part
s3.upload.by.parts.enabled = true


### The build version is stored in the specific file ###
Expand Down

0 comments on commit 1c03d77

Please sign in to comment.