Skip to content

Commit

Permalink
v2.0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
arnett, stu committed Nov 3, 2015
1 parent bdff87c commit c1ae2db
Show file tree
Hide file tree
Showing 9 changed files with 485 additions and 41 deletions.
113 changes: 75 additions & 38 deletions src/main/java/com/emc/object/s3/LargeFileUploader.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.emc.object.s3.bean.MultipartPartETag;
import com.emc.object.s3.request.*;
import com.emc.object.util.InputStreamSegment;
import com.emc.object.util.ProgressInputStream;
import com.emc.object.util.ProgressListener;
import com.emc.rest.util.SizedInputStream;
import org.apache.log4j.Logger;

Expand All @@ -47,6 +49,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

/**
* Convenience class to facilitate multipart upload for large files. This class will split the file
Expand Down Expand Up @@ -74,8 +77,9 @@ public class LargeFileUploader implements Runnable {
private Long partSize = DEFAULT_PART_SIZE;
private int threads = DEFAULT_THREADS;
private ExecutorService executorService;
private AtomicLong bytesTransferred = new AtomicLong();
private ProgressListener progressListener;

private long bytesTransferred;
private String eTag;

/**
Expand Down Expand Up @@ -113,22 +117,14 @@ public void doMultipartUpload() {
String uploadId = s3Client.initiateMultipartUpload(initRequest).getUploadId();

List<Future<MultipartPartETag>> futures = new ArrayList<Future<MultipartPartETag>>();
List<SizedInputStream> segmentStreams = new ArrayList<SizedInputStream>();
try {
// submit all upload tasks
int partNumber = 1;
long offset = 0, length = partSize;
while (offset < fullSize) {
if (offset + length > fullSize) length = fullSize - offset;

SizedInputStream segmentStream = file != null
? new InputStreamSegment(new FileInputStream(file), offset, length)
: new SizedInputStream(stream, length);
segmentStreams.add(segmentStream);

UploadPartRequest partRequest = new UploadPartRequest(bucket, key, uploadId, partNumber++, segmentStream);
partRequest.setContentLength(length);
futures.add(executorService.submit(new UploadPartTask(partRequest)));
futures.add(executorService.submit(new UploadPartTask(uploadId, partNumber++, offset, length)));

offset += length;
}
Expand All @@ -155,10 +151,6 @@ public void doMultipartUpload() {
if (e instanceof RuntimeException) throw (RuntimeException) e;
throw new RuntimeException("error during upload", e);
} finally {
for (SizedInputStream segmentStream : segmentStreams) {
bytesTransferred += segmentStream.getRead();
}

// make sure all spawned threads are shut down
executorService.shutdown();

Expand All @@ -184,22 +176,13 @@ public void doByteRangeUpload() {
s3Client.putObject(request);

List<Future<String>> futures = new ArrayList<Future<String>>();
List<SizedInputStream> segmentStreams = new ArrayList<SizedInputStream>();
try {
// submit all upload tasks
PutObjectRequest rangeRequest;
long offset = 0, length = partSize;
while (offset < fullSize) {
if (offset + length > fullSize) length = fullSize - offset;
Range range = Range.fromOffsetLength(offset, length);

SizedInputStream segmentStream = file != null
? new InputStreamSegment(new FileInputStream(file), offset, length)
: new SizedInputStream(stream, length);
segmentStreams.add(segmentStream);

rangeRequest = new PutObjectRequest(bucket, key, segmentStream).withRange(range);
futures.add(executorService.submit(new PutObjectTask(rangeRequest)));
futures.add(executorService.submit(new PutObjectTask(offset, length)));

offset += length;
}
Expand All @@ -219,10 +202,6 @@ public void doByteRangeUpload() {
if (e instanceof RuntimeException) throw (RuntimeException) e;
throw new RuntimeException("error during upload", e);
} finally {
for (SizedInputStream segmentStream : segmentStreams) {
bytesTransferred += segmentStream.getRead();
}

// make sure all spawned threads are shut down
executorService.shutdown();

Expand Down Expand Up @@ -300,7 +279,7 @@ public long getFullSize() {
}

public long getBytesTransferred() {
return bytesTransferred;
return bytesTransferred.get();
}

public String getETag() {
Expand Down Expand Up @@ -339,6 +318,14 @@ public void setCloseStream(boolean closeStream) {
this.closeStream = closeStream;
}

public ProgressListener getProgressListener() {
return progressListener;
}

public void setProgressListener(ProgressListener progressListener) {
this.progressListener = progressListener;
}

public long getPartSize() {
return partSize;
}
Expand Down Expand Up @@ -368,6 +355,14 @@ public ExecutorService getExecutorService() {
return executorService;
}

private void updateBytesTransferred(long count) {
long totalTransferred = bytesTransferred.addAndGet(count);

if(progressListener != null) {
progressListener.progress(totalTransferred, fullSize);
}
}

/**
* Allows for providing a custom thread executor (i.e. for custom thread factories). Note that if
* you set a custom executor service, the <code>threads</code> property will be ignored.
Expand Down Expand Up @@ -411,29 +406,71 @@ public LargeFileUploader withExecutorService(ExecutorService executorService) {
return this;
}

protected class UploadPartTask implements Callable<MultipartPartETag> {
private UploadPartRequest request;
public LargeFileUploader withProgressListener(ProgressListener progressListener) {
setProgressListener(progressListener);
return this;
}

private class UploadPartTask implements Callable<MultipartPartETag> {
private String uploadId;
private int partNumber;
private long offset;
private long length;

public UploadPartTask(UploadPartRequest request) {
this.request = request;
public UploadPartTask(String uploadId, int partNumber, long offset, long length) {
this.uploadId = uploadId;
this.partNumber = partNumber;
this.offset = offset;
this.length = length;
}

@Override
public MultipartPartETag call() throws Exception {
return s3Client.uploadPart(request);
SizedInputStream segmentStream;
if (file != null) {
segmentStream = new InputStreamSegment(new ProgressInputStream(new FileInputStream(file), progressListener), offset, length);
} else {
segmentStream = new SizedInputStream(new ProgressInputStream(stream, progressListener), length);
}

UploadPartRequest request = new UploadPartRequest(bucket, key, uploadId, partNumber++, segmentStream);
request.setContentLength(length);

MultipartPartETag etag = s3Client.uploadPart(request);
updateBytesTransferred(length);
return etag;
}
}

protected class PutObjectTask implements Callable<String> {
private PutObjectRequest request;
private long offset;
private long length;

public PutObjectTask(PutObjectRequest request) {
this.request = request;
public PutObjectTask(long offset, long length) {
this.offset = offset;
this.length = length;
}

@Override
public String call() throws Exception {
return s3Client.putObject(request).getETag();
Range range = Range.fromOffsetLength(offset, length);

SizedInputStream segmentStream = file != null
? new InputStreamSegment(new ProgressInputStream(new FileInputStream(file), progressListener),
offset, length) : new SizedInputStream(new ProgressInputStream(stream, progressListener),
length);

PutObjectRequest request = new PutObjectRequest(bucket, key, segmentStream).withRange(range);

String etag = s3Client.putObject(request).getETag();
long length = 0;
if(request.getRange() != null) {
length = request.getRange().getLast() - request.getRange().getFirst() + 1;
} else if(request.getContentLength() != null) {
length = request.getContentLength();
}
updateBytesTransferred(length);
return etag;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public ClientResponse handle(ClientRequest request) throws ClientHandlerExceptio

// if no identity is provided, this is an anonymous client
if (s3Config.getIdentity() != null) {
Map<String, String> parameters = RestUtil.getQueryParameterMap(request.getURI().getQuery());
Map<String, String> parameters = RestUtil.getQueryParameterMap(request.getURI().getRawQuery());
String resource = RestUtil.getEncodedPath(request.getURI());

// check if bucket is in hostname
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/com/emc/object/util/ProgressInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2015, EMC Corporation.
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* + Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* + Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* + The name of EMC Corporation may not be used to endorse or promote
* products derived from this software without specific prior written
* permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package com.emc.object.util;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;

/**
* InputStream wrapper class that fires events when data is read so realtime performance can be measured.
*/
public class ProgressInputStream extends FilterInputStream {
private final ProgressListener listener;

public ProgressInputStream(InputStream wrappedStream, ProgressListener listener) {
super(wrappedStream);
this.listener = listener;
}

@Override
public int read() throws IOException {
// This is a really, really bad idea.
throw new RuntimeException("No, I'm not going to let you kill performance.");
}

@Override
public int read(byte[] b) throws IOException {
int count = in.read(b);
if(listener != null) listener.transferred(count);
return count;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int count = in.read(b, off, len);
if(listener != null) listener.transferred(count);
return count;
}
}
47 changes: 47 additions & 0 deletions src/main/java/com/emc/object/util/ProgressListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2015, EMC Corporation.
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* + Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* + Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* + The name of EMC Corporation may not be used to endorse or promote
* products derived from this software without specific prior written
* permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package com.emc.object.util;

/**
* Interface for reporting progress on large file transfers.
*/
public interface ProgressListener {
/**
* Provides feedback on the number of bytes completed and the total number of bytes to transfer.
* @param completed bytes completely transferred
* @param total total number of bytes to transfer
*/
void progress(long completed, long total);

/**
* Reports that some bytes have been transferred. This is a raw method that will be called frequently and can
* be used for computing current transfer rate. Note that if data is retried, the sum of this method's events
* may be more than the total object size. For reporting on percent complete, use the progress method instead.
* @param size number of bytes transferred
*/
void transferred(long size);
}
Loading

0 comments on commit c1ae2db

Please sign in to comment.