From 7b51fa0cca61ea744a8d1d162d1e8c9911c81448 Mon Sep 17 00:00:00 2001 From: milanmajchrak Date: Tue, 20 Feb 2024 09:54:07 +0100 Subject: [PATCH 1/6] Temp commit - upload a file by uploading 50MB parts --- .../storage/bitstore/S3BitStoreService.java | 253 +++++++++++++++++- 1 file changed, 239 insertions(+), 14 deletions(-) diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java index c621aa6efce..6af2d4b2f09 100644 --- a/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java @@ -10,12 +10,15 @@ import static java.lang.String.valueOf; import java.io.File; +import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -24,6 +27,7 @@ import javax.validation.constraints.NotNull; import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; @@ -33,8 +37,13 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.transfer.Download; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerBuilder; @@ -45,11 +54,14 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.NullOutputStream; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpStatus; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.poi.util.ArrayUtil; import org.dspace.content.Bitstream; import org.dspace.core.Utils; import org.dspace.services.ConfigurationService; @@ -109,7 +121,7 @@ public class S3BitStoreService extends BaseBitStoreService { /** * container for all the assets */ - private String bucketName = null; + protected String bucketName = null; /** * (Optional) subfolder within bucket where objects are stored @@ -127,8 +139,7 @@ public class S3BitStoreService extends BaseBitStoreService { */ protected TransferManager tm = null; - private static final ConfigurationService configurationService - = DSpaceServicesFactory.getInstance().getConfigurationService(); + private ConfigurationService configurationService = null; /** * Utility method for generate AmazonS3 builder @@ -236,6 +247,9 @@ public void init() throws IOException { // bucket name if (StringUtils.isEmpty(bucketName)) { // get hostname of DSpace UI to use to name bucket + if (configurationService == null) { + configurationService = DSpaceServicesFactory.getInstance().getConfigurationService(); + } String hostname = Utils.getHostName(configurationService.getProperty("dspace.ui.url")); bucketName = DEFAULT_BUCKET_PREFIX + hostname; log.warn("S3 BucketName is not configured, setting default: " + bucketName); @@ -559,6 +573,19 @@ public void setPathStyleAccessEnabled(boolean pathStyleAccessEnabled) { * @throws Exception generic exception */ public static void main(String[] args) throws Exception { + + + class TestBitstream extends Bitstream { + public TestBitstream() { + super(); + } + + public TestBitstream(String internalId) { + super(); + setInternalId(internalId); + } + } + //TODO Perhaps refactor to be a unit test. Can't mock this without keys though. // parse command line @@ -582,30 +609,198 @@ public static void main(String[] args) throws Exception { } catch (ParseException e) { System.err.println(e.getMessage()); new HelpFormatter().printHelp( - S3BitStoreService.class.getSimpleName() + "options", options); + SyncS3BitStoreService.class.getSimpleName() + "options", options); return; } String accessKey = command.getOptionValue("a"); String secretKey = command.getOptionValue("s"); - S3BitStoreService store = new S3BitStoreService(); + SyncS3BitStoreService store = new SyncS3BitStoreService(); AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); - store.s3Service = AmazonS3ClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) - .build(); + AwsClientBuilder.EndpointConfiguration ec = + new AwsClientBuilder.EndpointConfiguration("https://s3.cl4.du.cesnet.cz", ""); + store.s3Service = FunctionalUtils.getDefaultOrBuild( + store.s3Service, + amazonClientBuilderBy(ec, awsCredentials, true) + ); + store.bucketName = "testbucket"; + + store.tm = FunctionalUtils.getDefaultOrBuild(store.tm, () -> TransferManagerBuilder.standard() + .withAlwaysCalculateMultipartMd5(true) + .withS3Client(store.s3Service) + .build()); - //Todo configurable region - Region usEast1 = Region.getRegion(Regions.US_EAST_1); - store.s3Service.setRegion(usEast1); + String id = store.generateId(); + String assetFile = command.getOptionValue("f"); + System.out.println("put() file " + assetFile + " under ID " + id + ": "); + FileInputStream fis = new FileInputStream(assetFile); + //TODO create bitstream for assetfile... + TestBitstream bitstream = new TestBitstream(id); + + // Initialize MessageDigest for computing checksum + MessageDigest digest; + try { + digest = MessageDigest.getInstance("MD5"); + } catch (Exception e) { + throw new RuntimeException("MD5 algorithm not available", e); + } + + // Initiate multipart upload + InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(store.getBucketName(), bitstream.getInternalId()); + String uploadId = store.s3Service.initiateMultipartUpload(initiateRequest).getUploadId(); + long start1 = System.currentTimeMillis(); + // Upload a file in multiple parts, one part has 50MB + // Set part size + long partSize = 50 * 1024 * 1024; // 50 MB + + // Create a list to hold the ETags for individual parts + List partETags = new ArrayList<>(); + + try { + // Upload parts + File file = new File(assetFile); + long fileLength = file.length(); + long remainingBytes = fileLength; + int partNumber = 1; + + while (remainingBytes > 0) { + long bytesToUpload = Math.min(partSize, remainingBytes); + + // Calculate checksum for the part + String partChecksum = calculateChecksum(file, fileLength - remainingBytes, bytesToUpload, digest); + + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(store.getBucketName()) + .withKey(bitstream.getInternalId()) + .withUploadId(uploadId) + .withPartNumber(partNumber) + .withFile(file) + .withFileOffset(fileLength - remainingBytes) + .withPartSize(bytesToUpload); + + // Upload the part + UploadPartResult uploadPartResponse = store.s3Service.uploadPart(uploadRequest); + + // Collect the ETag for the part + partETags.add(uploadPartResponse.getPartETag()); + + // Compare checksums - local with ETag + if (!StringUtils.equals(uploadPartResponse.getETag(), partChecksum)) { + String errorMessage = "Checksum does not match error: The locally computed checksum does not match with ETag " + + "from the UploadPartResult. Local checksum: " + partChecksum + ", ETag: " + + uploadPartResponse.getETag() + ", partNumber: " + partNumber; + log.error(errorMessage); + throw new IOException(errorMessage); + } + + remainingBytes -= bytesToUpload; + partNumber++; + } + + // Complete the multipart upload + CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(store.getBucketName(), bitstream.getInternalId(), uploadId, partETags); + store.s3Service.completeMultipartUpload(completeRequest); + + long now1 = System.currentTimeMillis(); + System.out.println("Part uploading: " + (now1 - start1) + " msecs"); + } catch (AmazonClientException e) { + e.printStackTrace(); + } + +// WORKING START + + // Case 1: store a file + long start2 = System.currentTimeMillis(); + store.put(bitstream, fis); +// System.out.println("Downloading started"); + long now2 = System.currentTimeMillis(); + System.out.println("Fluent uploading: " + (now2 - start2) + " msecs"); + InputStream downloaded = store.get(bitstream); + FileInputStream fisCheck = new FileInputStream(assetFile); +// System.out.print(ArrayUtils.isEquals(downloaded.readAllBytes(), fisCheck.readAllBytes())); + + S3BitStoreService.storeInputStreamToFile(downloaded, "C:\\Users\\MilanMajchrák\\Videos\\Captures\\test3.mp4"); + System.exit(0); +// WORKING END + + +// long now = System.currentTimeMillis(); +// System.out.println((now - start) + " msecs"); +// start = now; +// // examine the metadata returned +// Iterator iter = attrs.keySet().iterator(); +// System.out.println("Metadata after put():"); +// while (iter.hasNext()) +// { +// String key = (String)iter.next(); +// System.out.println( key + ": " + (String)attrs.get(key) ); +// } +// // Case 2: get metadata and compare +// System.out.print("about() file with ID " + id + ": "); +// Map attrs2 = store.about(id, attrs); +// now = System.currentTimeMillis(); +// System.out.println((now - start) + " msecs"); +// start = now; +// iter = attrs2.keySet().iterator(); +// System.out.println("Metadata after about():"); +// while (iter.hasNext()) +// { +// String key = (String)iter.next(); +// System.out.println( key + ": " + (String)attrs.get(key) ); +// } +// // Case 3: retrieve asset and compare bits +// System.out.print("get() file with ID " + id + ": "); +// java.io.FileOutputStream fos = new java.io.FileOutputStream(assetFile+".echo"); +// InputStream in = store.get(id); +// Utils.bufferedCopy(in, fos); +// fos.close(); +// in.close(); +// now = System.currentTimeMillis(); +// System.out.println((now - start) + " msecs"); +// start = now; +// // Case 4: remove asset +// System.out.print("remove() file with ID: " + id + ": "); +// store.remove(id); +// now = System.currentTimeMillis(); +// System.out.println((now - start) + " msecs"); +// System.out.flush(); +// // should get nothing back now - will throw exception +// store.get(id); + + + + + + + + + + + + + + + +// store.s3Service = AmazonS3ClientBuilder.standard() +// .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) +// .withRegion(Regions.EU_CENTRAL_1) +// .build(); +// +// //Todo configurable region +// Region usEast1 = Region.getRegion(Regions.US_EAST_1); +// store.s3Service.setRegion(usEast1); // get hostname of DSpace UI to use to name bucket - String hostname = Utils.getHostName(configurationService.getProperty("dspace.ui.url")); + +// ConfigurationService configurationService = DSpaceServicesFactory.getInstance().getConfigurationService(); +// String hostname = Utils.getHostName(configurationService.getProperty("dspace.ui.url")); + String hostname = Utils.getHostName("http://localhost:4000"); //Bucketname should be lowercase - store.bucketName = DEFAULT_BUCKET_PREFIX + hostname + ".s3test"; - store.s3Service.createBucket(store.bucketName); +// store.bucketName = DEFAULT_BUCKET_PREFIX + hostname + ".s3test"; +// store.s3Service.createBucket(store.bucketName); /* Broken in DSpace 6 TODO Refactor // time everything, todo, swtich to caliper long start = System.currentTimeMillis(); @@ -660,6 +855,22 @@ public static void main(String[] args) throws Exception { */ } + private static String calculateChecksum(File file, long offset, long length, MessageDigest digest) throws IOException { + try (FileInputStream fis = new FileInputStream(file); + DigestInputStream dis = new DigestInputStream(fis, digest)) { + // Skip to the specified offset + fis.skip(offset); + + // Read the specified length + IOUtils.copyLarge(dis, OutputStream.nullOutputStream(), 0, length); + // Use Apache Commons IO to copy the specified length +// Utils.bufferedCopy(dis, OutputStream.nullOutputStream()); + + // Convert the digest to a hex string + return Utils.toHex(digest.digest()); + } + } + /** * Is this a registered bitstream? (not stored via this service originally) * @param internalId @@ -669,4 +880,18 @@ public boolean isRegisteredBitstream(String internalId) { return internalId.startsWith(REGISTERED_FLAG); } + public static void storeInputStreamToFile(InputStream inputStream, String filePath) throws IOException { + // Open the output file stream + try (OutputStream outputStream = new FileOutputStream(filePath)) { + // Create a buffer to read and write data + byte[] buffer = new byte[4096]; + int bytesRead; + + // Read from the InputStream and write to the file + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + } + } + } From ac2cbb50eccc5e5d34696020789386e5ab730e6c Mon Sep 17 00:00:00 2001 From: milanmajchrak Date: Tue, 20 Feb 2024 11:03:24 +0100 Subject: [PATCH 2/6] Added uploading the file by parts - for every part is computed checksum and compared with UploadPartResult's ETag. This feature could be enabled or disabled by cfg. --- .../bitstore/SyncS3BitStoreService.java | 159 +++++++++++++++++- dspace/config/clarin-dspace.cfg | 2 + 2 files changed, 157 insertions(+), 4 deletions(-) diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java index 8077a966cf3..daa94e44c90 100644 --- a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java @@ -12,12 +12,22 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.transfer.Upload; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.dspace.content.Bitstream; @@ -38,6 +48,12 @@ public class SyncS3BitStoreService extends S3BitStoreService { private static final Logger log = LogManager.getLogger(SyncS3BitStoreService.class); private boolean syncEnabled = false; + /** + * Upload large file by parts - check the checksum of every part + */ + private boolean uploadByParts = false; + + @Autowired(required = true) DSBitStoreService dsBitStoreService; @@ -48,9 +64,29 @@ public SyncS3BitStoreService() { super(); } + /** + * Define syncEnabled and uploadByParts in the constructor - this values won't be overridden by the configuration + * + * @param syncEnabled if true, the file will be uploaded to the local assetstore + * @param uploadByParts if true, the file will be uploaded by parts + */ + public SyncS3BitStoreService(boolean syncEnabled, boolean uploadByParts) { + super(); + this.syncEnabled = syncEnabled; + this.uploadByParts = uploadByParts; + } + public void init() throws IOException { super.init(); - syncEnabled = configurationService.getBooleanProperty("sync.storage.service.enabled", false); + + // The syncEnabled and uploadByParts could be set to true in the constructor, + // do not override them by the configuration in this case + if (!syncEnabled) { + syncEnabled = configurationService.getBooleanProperty("sync.storage.service.enabled", false); + } + if (!uploadByParts) { + uploadByParts = configurationService.getBooleanProperty("upload.by.parts.enabled", false); + } } @Override @@ -66,9 +102,11 @@ public void put(Bitstream bitstream, InputStream in) throws IOException { Utils.bufferedCopy(dis, fos); in.close(); - Upload upload = tm.upload(getBucketName(), key, scratchFile); - - upload.waitForUploadResult(); + if (uploadByParts) { + uploadByParts(key, scratchFile); + } else { + uploadFluently(key, scratchFile); + } bitstream.setSizeBytes(scratchFile.length()); // we cannot use the S3 ETAG here as it could be not a MD5 in case of multipart upload (large files) or if @@ -119,6 +157,7 @@ public void remove(Bitstream bitstream) throws IOException { /** * Create a new file in the assetstore if it does not exist + * * @param localFile * @throws IOException */ @@ -137,4 +176,116 @@ private void createFileIfNotExist(File localFile) throws IOException { " was not created"); } } + + /** + * Upload a file fluently. The file is uploaded in a single request. + * + * @param key the bitstream's internalId + * @param scratchFile the file to upload + * @throws InterruptedException if the S3 upload is interrupted + */ + private void uploadFluently(String key, File scratchFile) throws InterruptedException { + Upload upload = tm.upload(getBucketName(), key, scratchFile); + + upload.waitForUploadResult(); + } + + /** + * Upload a file by parts. The file is divided into parts and each part is uploaded separately. + * The checksum of each part is checked. If the checksum does not match, the file is not uploaded. + * + * @param key the bitstream's internalId + * @param scratchFile the file to upload + * @throws IOException if an I/O error occurs + */ + private void uploadByParts(String key, File scratchFile) throws IOException { + // Initialize MessageDigest for computing checksum + MessageDigest digest; + try { + digest = MessageDigest.getInstance("MD5"); + } catch (Exception e) { + throw new RuntimeException("MD5 algorithm not available", e); + } + + // Initiate multipart upload + InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(getBucketName(), key); + String uploadId = this.s3Service.initiateMultipartUpload(initiateRequest).getUploadId(); + // Upload a file in multiple parts, one part has 50MB + long partSize = 50 * 1024 * 1024; // 50 MB + + // Create a list to hold the ETags for individual parts + List partETags = new ArrayList<>(); + + try { + // Upload parts + File file = new File(scratchFile.getPath()); + long fileLength = file.length(); + long remainingBytes = fileLength; + int partNumber = 1; + + while (remainingBytes > 0) { + long bytesToUpload = Math.min(partSize, remainingBytes); + + // Calculate the checksum for the part + String partChecksum = calculatePartChecksum(file, fileLength - remainingBytes, bytesToUpload, digest); + + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(this.getBucketName()) + .withKey(key) + .withUploadId(uploadId) + .withPartNumber(partNumber) + .withFile(file) + .withFileOffset(fileLength - remainingBytes) + .withPartSize(bytesToUpload); + + // Upload the part + UploadPartResult uploadPartResponse = this.s3Service.uploadPart(uploadRequest); + + // Collect the ETag for the part + partETags.add(uploadPartResponse.getPartETag()); + + // Compare checksums - local with ETag + if (!StringUtils.equals(uploadPartResponse.getETag(), partChecksum)) { + String errorMessage = "Checksums do not match error: The locally computed checksum does " + + "not match with the ETag from the UploadPartResult. Local checksum: " + partChecksum + + ", ETag: " + uploadPartResponse.getETag() + ", partNumber: " + partNumber; + log.error(errorMessage); + throw new IOException(errorMessage); + } + + remainingBytes -= bytesToUpload; + partNumber++; + } + + // Complete the multipart upload + CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(this.getBucketName(), key, uploadId, partETags); + this.s3Service.completeMultipartUpload(completeRequest); + } catch (AmazonClientException e) { + log.error("Cannot upload the file by parts because: ", e); + } + } + + /** + * Calculate the checksum of the specified part of the file (Multipart upload) + * + * @param file the uploading file + * @param offset the offset in the file + * @param length the length of the part + * @param digest the message digest for computing the checksum + * @return the checksum of the part + * @throws IOException if an I/O error occurs + */ + public static String calculatePartChecksum(File file, long offset, long length, MessageDigest digest) throws IOException { + try (FileInputStream fis = new FileInputStream(file); + DigestInputStream dis = new DigestInputStream(fis, digest)) { + // Skip to the specified offset + fis.skip(offset); + + // Read the specified length + IOUtils.copyLarge(dis, OutputStream.nullOutputStream(), 0, length); + + // Convert the digest to a hex string + return Utils.toHex(digest.digest()); + } + } } diff --git a/dspace/config/clarin-dspace.cfg b/dspace/config/clarin-dspace.cfg index a13f504ab9a..5379ec8e7c4 100644 --- a/dspace/config/clarin-dspace.cfg +++ b/dspace/config/clarin-dspace.cfg @@ -246,6 +246,8 @@ file.preview.enabled = false ### Storage service ### # Synchronization is NOT enabled by default sync.storage.service.enabled = true +# Upload large file by parts - check the checksum of every part +upload.by.parts.enabled = true ### The build version is stored in the specific file ### From c0d65d262db5489dde210af2097e62732b7095aa Mon Sep 17 00:00:00 2001 From: milanmajchrak Date: Tue, 20 Feb 2024 11:05:24 +0100 Subject: [PATCH 3/6] Undo S3BitStoreService changes --- .../storage/bitstore/S3BitStoreService.java | 253 +----------------- 1 file changed, 14 insertions(+), 239 deletions(-) diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java index 6af2d4b2f09..c621aa6efce 100644 --- a/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java @@ -10,15 +10,12 @@ import static java.lang.String.valueOf; import java.io.File; -import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.security.DigestInputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,7 +24,6 @@ import javax.validation.constraints.NotNull; import com.amazonaws.AmazonClientException; -import com.amazonaws.AmazonServiceException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; @@ -37,13 +33,8 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.UploadPartRequest; -import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.transfer.Download; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerBuilder; @@ -54,14 +45,11 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.NullOutputStream; -import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpStatus; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.poi.util.ArrayUtil; import org.dspace.content.Bitstream; import org.dspace.core.Utils; import org.dspace.services.ConfigurationService; @@ -121,7 +109,7 @@ public class S3BitStoreService extends BaseBitStoreService { /** * container for all the assets */ - protected String bucketName = null; + private String bucketName = null; /** * (Optional) subfolder within bucket where objects are stored @@ -139,7 +127,8 @@ public class S3BitStoreService extends BaseBitStoreService { */ protected TransferManager tm = null; - private ConfigurationService configurationService = null; + private static final ConfigurationService configurationService + = DSpaceServicesFactory.getInstance().getConfigurationService(); /** * Utility method for generate AmazonS3 builder @@ -247,9 +236,6 @@ public void init() throws IOException { // bucket name if (StringUtils.isEmpty(bucketName)) { // get hostname of DSpace UI to use to name bucket - if (configurationService == null) { - configurationService = DSpaceServicesFactory.getInstance().getConfigurationService(); - } String hostname = Utils.getHostName(configurationService.getProperty("dspace.ui.url")); bucketName = DEFAULT_BUCKET_PREFIX + hostname; log.warn("S3 BucketName is not configured, setting default: " + bucketName); @@ -573,19 +559,6 @@ public void setPathStyleAccessEnabled(boolean pathStyleAccessEnabled) { * @throws Exception generic exception */ public static void main(String[] args) throws Exception { - - - class TestBitstream extends Bitstream { - public TestBitstream() { - super(); - } - - public TestBitstream(String internalId) { - super(); - setInternalId(internalId); - } - } - //TODO Perhaps refactor to be a unit test. Can't mock this without keys though. // parse command line @@ -609,198 +582,30 @@ public TestBitstream(String internalId) { } catch (ParseException e) { System.err.println(e.getMessage()); new HelpFormatter().printHelp( - SyncS3BitStoreService.class.getSimpleName() + "options", options); + S3BitStoreService.class.getSimpleName() + "options", options); return; } String accessKey = command.getOptionValue("a"); String secretKey = command.getOptionValue("s"); - SyncS3BitStoreService store = new SyncS3BitStoreService(); + S3BitStoreService store = new S3BitStoreService(); AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey); - AwsClientBuilder.EndpointConfiguration ec = - new AwsClientBuilder.EndpointConfiguration("https://s3.cl4.du.cesnet.cz", ""); - store.s3Service = FunctionalUtils.getDefaultOrBuild( - store.s3Service, - amazonClientBuilderBy(ec, awsCredentials, true) - ); - store.bucketName = "testbucket"; - - store.tm = FunctionalUtils.getDefaultOrBuild(store.tm, () -> TransferManagerBuilder.standard() - .withAlwaysCalculateMultipartMd5(true) - .withS3Client(store.s3Service) - .build()); + store.s3Service = AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) + .build(); - String id = store.generateId(); - String assetFile = command.getOptionValue("f"); - System.out.println("put() file " + assetFile + " under ID " + id + ": "); - FileInputStream fis = new FileInputStream(assetFile); - //TODO create bitstream for assetfile... - TestBitstream bitstream = new TestBitstream(id); - - // Initialize MessageDigest for computing checksum - MessageDigest digest; - try { - digest = MessageDigest.getInstance("MD5"); - } catch (Exception e) { - throw new RuntimeException("MD5 algorithm not available", e); - } - - // Initiate multipart upload - InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(store.getBucketName(), bitstream.getInternalId()); - String uploadId = store.s3Service.initiateMultipartUpload(initiateRequest).getUploadId(); - long start1 = System.currentTimeMillis(); - // Upload a file in multiple parts, one part has 50MB - // Set part size - long partSize = 50 * 1024 * 1024; // 50 MB - - // Create a list to hold the ETags for individual parts - List partETags = new ArrayList<>(); - - try { - // Upload parts - File file = new File(assetFile); - long fileLength = file.length(); - long remainingBytes = fileLength; - int partNumber = 1; - - while (remainingBytes > 0) { - long bytesToUpload = Math.min(partSize, remainingBytes); - - // Calculate checksum for the part - String partChecksum = calculateChecksum(file, fileLength - remainingBytes, bytesToUpload, digest); - - UploadPartRequest uploadRequest = new UploadPartRequest() - .withBucketName(store.getBucketName()) - .withKey(bitstream.getInternalId()) - .withUploadId(uploadId) - .withPartNumber(partNumber) - .withFile(file) - .withFileOffset(fileLength - remainingBytes) - .withPartSize(bytesToUpload); - - // Upload the part - UploadPartResult uploadPartResponse = store.s3Service.uploadPart(uploadRequest); - - // Collect the ETag for the part - partETags.add(uploadPartResponse.getPartETag()); - - // Compare checksums - local with ETag - if (!StringUtils.equals(uploadPartResponse.getETag(), partChecksum)) { - String errorMessage = "Checksum does not match error: The locally computed checksum does not match with ETag " + - "from the UploadPartResult. Local checksum: " + partChecksum + ", ETag: " + - uploadPartResponse.getETag() + ", partNumber: " + partNumber; - log.error(errorMessage); - throw new IOException(errorMessage); - } - - remainingBytes -= bytesToUpload; - partNumber++; - } - - // Complete the multipart upload - CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(store.getBucketName(), bitstream.getInternalId(), uploadId, partETags); - store.s3Service.completeMultipartUpload(completeRequest); - - long now1 = System.currentTimeMillis(); - System.out.println("Part uploading: " + (now1 - start1) + " msecs"); - } catch (AmazonClientException e) { - e.printStackTrace(); - } - -// WORKING START - - // Case 1: store a file - long start2 = System.currentTimeMillis(); - store.put(bitstream, fis); -// System.out.println("Downloading started"); - long now2 = System.currentTimeMillis(); - System.out.println("Fluent uploading: " + (now2 - start2) + " msecs"); - InputStream downloaded = store.get(bitstream); - FileInputStream fisCheck = new FileInputStream(assetFile); -// System.out.print(ArrayUtils.isEquals(downloaded.readAllBytes(), fisCheck.readAllBytes())); - - S3BitStoreService.storeInputStreamToFile(downloaded, "C:\\Users\\MilanMajchrák\\Videos\\Captures\\test3.mp4"); - System.exit(0); -// WORKING END - - -// long now = System.currentTimeMillis(); -// System.out.println((now - start) + " msecs"); -// start = now; -// // examine the metadata returned -// Iterator iter = attrs.keySet().iterator(); -// System.out.println("Metadata after put():"); -// while (iter.hasNext()) -// { -// String key = (String)iter.next(); -// System.out.println( key + ": " + (String)attrs.get(key) ); -// } -// // Case 2: get metadata and compare -// System.out.print("about() file with ID " + id + ": "); -// Map attrs2 = store.about(id, attrs); -// now = System.currentTimeMillis(); -// System.out.println((now - start) + " msecs"); -// start = now; -// iter = attrs2.keySet().iterator(); -// System.out.println("Metadata after about():"); -// while (iter.hasNext()) -// { -// String key = (String)iter.next(); -// System.out.println( key + ": " + (String)attrs.get(key) ); -// } -// // Case 3: retrieve asset and compare bits -// System.out.print("get() file with ID " + id + ": "); -// java.io.FileOutputStream fos = new java.io.FileOutputStream(assetFile+".echo"); -// InputStream in = store.get(id); -// Utils.bufferedCopy(in, fos); -// fos.close(); -// in.close(); -// now = System.currentTimeMillis(); -// System.out.println((now - start) + " msecs"); -// start = now; -// // Case 4: remove asset -// System.out.print("remove() file with ID: " + id + ": "); -// store.remove(id); -// now = System.currentTimeMillis(); -// System.out.println((now - start) + " msecs"); -// System.out.flush(); -// // should get nothing back now - will throw exception -// store.get(id); - - - - - - - - - - - - - - - -// store.s3Service = AmazonS3ClientBuilder.standard() -// .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) -// .withRegion(Regions.EU_CENTRAL_1) -// .build(); -// -// //Todo configurable region -// Region usEast1 = Region.getRegion(Regions.US_EAST_1); -// store.s3Service.setRegion(usEast1); + //Todo configurable region + Region usEast1 = Region.getRegion(Regions.US_EAST_1); + store.s3Service.setRegion(usEast1); // get hostname of DSpace UI to use to name bucket - -// ConfigurationService configurationService = DSpaceServicesFactory.getInstance().getConfigurationService(); -// String hostname = Utils.getHostName(configurationService.getProperty("dspace.ui.url")); - String hostname = Utils.getHostName("http://localhost:4000"); + String hostname = Utils.getHostName(configurationService.getProperty("dspace.ui.url")); //Bucketname should be lowercase -// store.bucketName = DEFAULT_BUCKET_PREFIX + hostname + ".s3test"; -// store.s3Service.createBucket(store.bucketName); + store.bucketName = DEFAULT_BUCKET_PREFIX + hostname + ".s3test"; + store.s3Service.createBucket(store.bucketName); /* Broken in DSpace 6 TODO Refactor // time everything, todo, swtich to caliper long start = System.currentTimeMillis(); @@ -855,22 +660,6 @@ public TestBitstream(String internalId) { */ } - private static String calculateChecksum(File file, long offset, long length, MessageDigest digest) throws IOException { - try (FileInputStream fis = new FileInputStream(file); - DigestInputStream dis = new DigestInputStream(fis, digest)) { - // Skip to the specified offset - fis.skip(offset); - - // Read the specified length - IOUtils.copyLarge(dis, OutputStream.nullOutputStream(), 0, length); - // Use Apache Commons IO to copy the specified length -// Utils.bufferedCopy(dis, OutputStream.nullOutputStream()); - - // Convert the digest to a hex string - return Utils.toHex(digest.digest()); - } - } - /** * Is this a registered bitstream? (not stored via this service originally) * @param internalId @@ -880,18 +669,4 @@ public boolean isRegisteredBitstream(String internalId) { return internalId.startsWith(REGISTERED_FLAG); } - public static void storeInputStreamToFile(InputStream inputStream, String filePath) throws IOException { - // Open the output file stream - try (OutputStream outputStream = new FileOutputStream(filePath)) { - // Create a buffer to read and write data - byte[] buffer = new byte[4096]; - int bytesRead; - - // Read from the InputStream and write to the file - while ((bytesRead = inputStream.read(buffer)) != -1) { - outputStream.write(buffer, 0, bytesRead); - } - } - } - } From 8f8318f478cd2fcab6c9b274a8a5b70eea9c0a77 Mon Sep 17 00:00:00 2001 From: milanmajchrak Date: Tue, 20 Feb 2024 12:31:43 +0100 Subject: [PATCH 4/6] Fixed checkstyle issues --- .../org/dspace/storage/bitstore/SyncS3BitStoreService.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java index daa94e44c90..f5ff14bfc69 100644 --- a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java @@ -258,7 +258,8 @@ private void uploadByParts(String key, File scratchFile) throws IOException { } // Complete the multipart upload - CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(this.getBucketName(), key, uploadId, partETags); + CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(this.getBucketName(), + key, uploadId, partETags); this.s3Service.completeMultipartUpload(completeRequest); } catch (AmazonClientException e) { log.error("Cannot upload the file by parts because: ", e); @@ -275,7 +276,8 @@ private void uploadByParts(String key, File scratchFile) throws IOException { * @return the checksum of the part * @throws IOException if an I/O error occurs */ - public static String calculatePartChecksum(File file, long offset, long length, MessageDigest digest) throws IOException { + public static String calculatePartChecksum(File file, long offset, long length, MessageDigest digest) + throws IOException { try (FileInputStream fis = new FileInputStream(file); DigestInputStream dis = new DigestInputStream(fis, digest)) { // Skip to the specified offset From 39e469752cafab4a64186b764a0e17cff6a60ccb Mon Sep 17 00:00:00 2001 From: milanmajchrak Date: Tue, 20 Feb 2024 12:33:36 +0100 Subject: [PATCH 5/6] Prettify the code --- .../java/org/dspace/storage/bitstore/SyncS3BitStoreService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java index f5ff14bfc69..9c70e30a16e 100644 --- a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java @@ -53,7 +53,6 @@ public class SyncS3BitStoreService extends S3BitStoreService { */ private boolean uploadByParts = false; - @Autowired(required = true) DSBitStoreService dsBitStoreService; From b4674432b75e9200af8026573de08d0fe10927ab Mon Sep 17 00:00:00 2001 From: milanmajchrak Date: Tue, 20 Feb 2024 14:20:56 +0100 Subject: [PATCH 6/6] Changed cfg property to be better understandable and file size is converted into constant. --- .../storage/bitstore/SyncS3BitStoreService.java | 11 +++++++---- dspace/config/clarin-dspace.cfg | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java index 9c70e30a16e..ff1e2f86740 100644 --- a/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/SyncS3BitStoreService.java @@ -48,6 +48,11 @@ public class SyncS3BitStoreService extends S3BitStoreService { private static final Logger log = LogManager.getLogger(SyncS3BitStoreService.class); private boolean syncEnabled = false; + /** + * The uploading file is divided into parts and each part is uploaded separately. The size of the part is 50 MB. + */ + private static final long UPLOAD_FILE_PART_SIZE = 50 * 1024 * 1024; // 50 MB + /** * Upload large file by parts - check the checksum of every part */ @@ -84,7 +89,7 @@ public void init() throws IOException { syncEnabled = configurationService.getBooleanProperty("sync.storage.service.enabled", false); } if (!uploadByParts) { - uploadByParts = configurationService.getBooleanProperty("upload.by.parts.enabled", false); + uploadByParts = configurationService.getBooleanProperty("s3.upload.by.parts.enabled", false); } } @@ -209,8 +214,6 @@ private void uploadByParts(String key, File scratchFile) throws IOException { // Initiate multipart upload InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(getBucketName(), key); String uploadId = this.s3Service.initiateMultipartUpload(initiateRequest).getUploadId(); - // Upload a file in multiple parts, one part has 50MB - long partSize = 50 * 1024 * 1024; // 50 MB // Create a list to hold the ETags for individual parts List partETags = new ArrayList<>(); @@ -223,7 +226,7 @@ private void uploadByParts(String key, File scratchFile) throws IOException { int partNumber = 1; while (remainingBytes > 0) { - long bytesToUpload = Math.min(partSize, remainingBytes); + long bytesToUpload = Math.min(UPLOAD_FILE_PART_SIZE, remainingBytes); // Calculate the checksum for the part String partChecksum = calculatePartChecksum(file, fileLength - remainingBytes, bytesToUpload, digest); diff --git a/dspace/config/clarin-dspace.cfg b/dspace/config/clarin-dspace.cfg index 5379ec8e7c4..7f47d9896cd 100644 --- a/dspace/config/clarin-dspace.cfg +++ b/dspace/config/clarin-dspace.cfg @@ -247,7 +247,7 @@ file.preview.enabled = false # Synchronization is NOT enabled by default sync.storage.service.enabled = true # Upload large file by parts - check the checksum of every part -upload.by.parts.enabled = true +s3.upload.by.parts.enabled = true ### The build version is stored in the specific file ###