From 76f04f1ed27c9c5b81dd0f2f7094cb5a05c71968 Mon Sep 17 00:00:00 2001 From: Andrea Bollini Date: Mon, 15 Apr 2024 12:56:58 +0200 Subject: [PATCH] Add support for LazyDownload of files from S3 --- .../storage/bitstore/S3BitStoreService.java | 101 +++++++++++++++--- .../storage/bitstore/S3BitStoreServiceIT.java | 19 +++- 2 files changed, 103 insertions(+), 17 deletions(-) diff --git a/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java b/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java index afb2eaa97b3..36456a8945e 100644 --- a/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java +++ b/dspace-api/src/main/java/org/dspace/storage/bitstore/S3BitStoreService.java @@ -10,6 +10,7 @@ import static java.lang.String.valueOf; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -102,6 +103,11 @@ public class S3BitStoreService extends BaseBitStoreService { private String awsRegionName; private boolean useRelativePath; + /** + * The maximum size of individual chunk to download from S3 when a file is accessed. Default 5Mb + */ + private long bufferSize = 5 * 1024 * 1024; + /** * container for all the assets */ @@ -258,20 +264,7 @@ public InputStream get(Bitstream bitstream) throws IOException { if (isRegisteredBitstream(key)) { key = key.substring(REGISTERED_FLAG.length()); } - try { - File tempFile = File.createTempFile("s3-disk-copy-" + UUID.randomUUID(), "temp"); - tempFile.deleteOnExit(); - - GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, key); - - Download download = tm.download(getObjectRequest, tempFile); - download.waitForCompletion(); - - return new DeleteOnCloseFileInputStream(tempFile); - } catch (AmazonClientException | InterruptedException e) { - log.error("get(" + key + ")", e); - throw new IOException(e); - } + return new S3LazyInputStream(key, bufferSize, bitstream.getSizeBytes()); } /** @@ -622,4 +615,84 @@ public boolean isRegisteredBitstream(String internalId) { return internalId.startsWith(REGISTERED_FLAG); } + public void setBufferSize(long bufferSize) { + this.bufferSize = bufferSize; + } + + /** + * This inner class represent an InputStream that uses temporary files to + * represent chunk of the object downloaded from S3. When the input stream is + * read the class look first to the current chunk and download a new one once if + * the current one as been fully read. The class is responsible to close a chunk + * as soon as a new one is retrieved, the last chunk is closed when the input + * stream itself is closed or the last byte is read (the first of the two) + */ + public class S3LazyInputStream extends InputStream { + private InputStream currentChunkStream; + private String objectKey; + private long endOfChunk = -1; + private long chunkMaxSize; + private long currPos = 0; + private long fileSize; + + public S3LazyInputStream(String objectKey, long chunkMaxSize, long fileSize) throws IOException { + this.objectKey = objectKey; + this.chunkMaxSize = chunkMaxSize; + this.endOfChunk = 0; + this.fileSize = fileSize; + downloadChunk(); + } + + @Override + public int read() throws IOException { + // is the current chunk completely read and other are available? + if (currPos == endOfChunk && currPos < fileSize) { + currentChunkStream.close(); + downloadChunk(); + } + + int byteRead = currPos < endOfChunk ? currentChunkStream.read() : -1; + // do we get any data or are we at the end of the file? + if (byteRead != -1) { + currPos++; + } else { + currentChunkStream.close(); + } + return byteRead; + } + + /** + * This method download the next chunk from S3 + * + * @throws IOException + * @throws FileNotFoundException + */ + private void downloadChunk() throws IOException, FileNotFoundException { + // Create a DownloadFileRequest with the desired byte range + long startByte = currPos; // Start byte (inclusive) + long endByte = Long.min(startByte + chunkMaxSize - 1, fileSize - 1); // End byte (inclusive) + GetObjectRequest getRequest = new GetObjectRequest(bucketName, objectKey) + .withRange(startByte, endByte); + + File currentChunkFile = File.createTempFile("s3-disk-copy-" + UUID.randomUUID(), "temp"); + currentChunkFile.deleteOnExit(); + try { + Download download = tm.download(getRequest, currentChunkFile); + download.waitForCompletion(); + currentChunkStream = new DeleteOnCloseFileInputStream(currentChunkFile); + endOfChunk = endOfChunk + download.getProgress().getBytesTransferred(); + } catch (AmazonClientException | InterruptedException e) { + currentChunkFile.delete(); + throw new IOException(e); + } + } + + @Override + public void close() throws IOException { + if (currentChunkStream != null) { + currentChunkStream.close(); + } + } + + } } diff --git a/dspace-api/src/test/java/org/dspace/storage/bitstore/S3BitStoreServiceIT.java b/dspace-api/src/test/java/org/dspace/storage/bitstore/S3BitStoreServiceIT.java index 30eeff2ddc8..6ea21eac8d6 100644 --- a/dspace-api/src/test/java/org/dspace/storage/bitstore/S3BitStoreServiceIT.java +++ b/dspace-api/src/test/java/org/dspace/storage/bitstore/S3BitStoreServiceIT.java @@ -99,6 +99,7 @@ public void setup() throws Exception { s3BitStoreService = new S3BitStoreService(amazonS3Client); s3BitStoreService.setEnabled(BooleanUtils.toBoolean( configurationService.getProperty("assetstore.s3.enabled"))); + s3BitStoreService.setBufferSize(22); context.turnOffAuthorisationSystem(); parentCommunity = CommunityBuilder.createCommunity(context) @@ -129,12 +130,25 @@ public void testBitstreamPutAndGetWithAlreadyPresentBucket() throws IOException assertThat(amazonS3Client.listBuckets(), contains(bucketNamed(bucketName))); context.turnOffAuthorisationSystem(); - String content = "Test bitstream content"; + String content = "Test bitstream content"; + String contentOverOneSpan = "This content span two chunks"; + String contentExactlyTwoSpans = "Test bitstream contentTest bitstream content"; + String contentOverOneTwoSpans = "Test bitstream contentThis content span three chunks"; Bitstream bitstream = createBitstream(content); + Bitstream bitstreamOverOneSpan = createBitstream(contentOverOneSpan); + Bitstream bitstreamExactlyTwoSpans = createBitstream(contentExactlyTwoSpans); + Bitstream bitstreamOverOneTwoSpans = createBitstream(contentOverOneTwoSpans); context.restoreAuthSystemState(); - s3BitStoreService.put(bitstream, toInputStream(content)); + checkGetPut(bucketName, content, bitstream); + checkGetPut(bucketName, contentOverOneSpan, bitstreamOverOneSpan); + checkGetPut(bucketName, contentExactlyTwoSpans, bitstreamExactlyTwoSpans); + checkGetPut(bucketName, contentOverOneTwoSpans, bitstreamOverOneTwoSpans); + + } + private void checkGetPut(String bucketName, String content, Bitstream bitstream) throws IOException { + s3BitStoreService.put(bitstream, toInputStream(content)); String expectedChecksum = Utils.toHex(generateChecksum(content)); assertThat(bitstream.getSizeBytes(), is((long) content.length())); @@ -147,7 +161,6 @@ public void testBitstreamPutAndGetWithAlreadyPresentBucket() throws IOException String key = s3BitStoreService.getFullKey(bitstream.getInternalId()); ObjectMetadata objectMetadata = amazonS3Client.getObjectMetadata(bucketName, key); assertThat(objectMetadata.getContentMD5(), is(expectedChecksum)); - } @Test