From 547a696ef891dfa59024bde240ab3ca35afe2dea Mon Sep 17 00:00:00 2001 From: Jonathan Zacsh Date: Tue, 19 Nov 2024 14:58:32 -0600 Subject: [PATCH 1/2] fix microsoft OOOM: don't stream into memory I'm fixing a bug here where the implementation of the microsoft adapter was reading an arbitrary stream into memory (using DataChunk class), and instead "just stream directly to the endpoint". It's not _quite_ that simple though: we actually stream twice, for the very reason that the old implementation was streaming into memory: we need to know the filesize. while we're here I'm doing some "leave the campground better" tasks, so here's a more nitty-gritty breakdown of this PR: - microsoft adapter: marking duplicated code Microsoft{Photo,Video}* as needing to rely on newly refactored MicrosoftMedia (so hese kinds of bug fixes are easier to maintain, for example). - DTP: make testability (via DI) easier with for java.net.URL streamers (this has already become a pattern, so I just formalized it and dropped a TODO in the places that are doing this locally; this way, in the future it'll be easier/more obvious what "the pattern" is and how to stop maintaining disparate copies of it) - microsoft adapter: remainder of DataChunk code is now just a POJO, so switching to autovalue and letting the test live in the primary user: the new `StreamChunker` (as a mini "integrated" test). - microsoft adapter: all size-related operations are a `long` now --- .../flickr/media/FlickrMediaImporter.java | 2 + .../build.gradle | 8 +- .../transfer/microsoft/DataChunk.java | 80 ++++--------- .../microsoft/MicrosoftTransferExtension.java | 6 +- .../transfer/microsoft/StreamChunker.java | 42 +++++++ .../media/MicrosoftMediaImporter.java | 105 ++++++++++-------- .../photos/MicrosoftPhotosImporter.java | 83 ++++++++------ .../transfer/microsoft/DataChunkTest.java | 94 ---------------- .../transfer/microsoft/StreamChunkerTest.java | 92 +++++++++++++++ .../media/MicrosoftMediaExporterTest.java | 2 +- .../media/MicrosoftMediaImporterTest.java | 61 ++++++++-- .../photos/MicrosoftPhotosImporterTest.java | 58 ++++++++-- portability-spi-api/build.gradle | 3 +- .../transport/DiscardingStreamCounter.java | 20 ++++ .../spi/api/transport/FileStreamer.java | 9 ++ .../spi/api/transport/JobFileStream.java | 35 ++++++ .../spi/api/transport/RemoteFileStreamer.java | 11 ++ .../spi/api/transport/UrlGetStreamer.java | 46 ++++++++ .../storage/TemporaryPerJobDataStore.java | 8 ++ .../types/common/DownloadableItem.java | 7 +- 20 files changed, 518 insertions(+), 254 deletions(-) create mode 100644 extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/StreamChunker.java delete mode 100644 extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/DataChunkTest.java create mode 100644 extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/StreamChunkerTest.java create mode 100644 portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/DiscardingStreamCounter.java create mode 100644 portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/FileStreamer.java create mode 100644 portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/JobFileStream.java create mode 100644 portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/RemoteFileStreamer.java create mode 100644 portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/UrlGetStreamer.java diff --git a/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/media/FlickrMediaImporter.java b/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/media/FlickrMediaImporter.java index b80dc61da..877f8bb6f 100644 --- a/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/media/FlickrMediaImporter.java +++ b/extensions/data-transfer/portability-data-transfer-flickr/src/main/java/org/datatransferproject/datatransfer/flickr/media/FlickrMediaImporter.java @@ -318,6 +318,8 @@ private static String cleanString(String string) { return Strings.isNullOrEmpty(string) ? "" : string; } + // TODO migrate this testability-surface to newly shared JobFileStreamer and RemoteFileStreamer + // of org.datatransferproject.spi.api.transport package. @VisibleForTesting class ImageStreamProvider { diff --git a/extensions/data-transfer/portability-data-transfer-microsoft/build.gradle b/extensions/data-transfer/portability-data-transfer-microsoft/build.gradle index cd80a67dc..34cdd0dd7 100644 --- a/extensions/data-transfer/portability-data-transfer-microsoft/build.gradle +++ b/extensions/data-transfer/portability-data-transfer-microsoft/build.gradle @@ -21,20 +21,24 @@ plugins { dependencies { compile project(':portability-spi-cloud') compile project(':portability-spi-transfer') + compile project(':portability-spi-api') compile "com.squareup.okhttp3:okhttp:${okHttpVersion}" compile "com.squareup.okhttp3:logging-interceptor:${okHttpVersion}" compile("com.google.api-client:google-api-client:${googleApiClient}") + compileOnly "com.google.auto.value:auto-value-annotations:${autoValueVersion}" + annotationProcessor "com.google.auto.value:auto-value:${autoValueVersion}" + // REVIEW: We could standardize the version in gradle.propertoes but this would mean all dependent extensions must be revved at the same time compile 'com.googlecode.ez-vcard:ez-vcard:0.10.3' + testImplementation "org.mockito:mockito-inline:${mockitoInlineVersion}" + testCompile "org.mockito:mockito-core:${mockitoCoreVersion}" testCompile project(':extensions:auth:portability-auth-harness-microsoft') testCompile group: 'com.squareup.okhttp', name: 'mockwebserver', version: '2.7.5' testCompile group: 'com.squareup.okhttp3', name: 'mockwebserver', version: '3.2.0' testCompile("com.google.http-client:google-http-client-gson:${googleHttpClientVersion}") - testImplementation "org.mockito:mockito-inline:${mockitoVersion}" - } configurePublication(project) diff --git a/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/DataChunk.java b/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/DataChunk.java index 4715e092f..ca2cfbd23 100644 --- a/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/DataChunk.java +++ b/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/DataChunk.java @@ -1,70 +1,38 @@ package org.datatransferproject.transfer.microsoft; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; +import com.google.auto.value.AutoValue; -/** - This utility class allows us to break up an InputStream into multiple chunks - for part-by-part upload to a service, for example to be consumed in an upload session. -*/ -public class DataChunk { - private static final int CHUNK_SIZE = 32000 * 1024; // 32000KiB +/** Describe small buffers of bytes captured from a large java.io Stream. */ +@AutoValue +public abstract class DataChunk { + /** Bytes being held in this buffer. */ + public abstract byte[] chunk(); - private final byte[] data; - private final int size; - private final int rangeStart; - public DataChunk(byte[] data, int size, int rangeStart) { - this.data = data; - this.size = size; - this.rangeStart = rangeStart; + /** Byte count of {@link chunk}. */ + public int size() { + return chunk().length; } - public int getSize() { - return size; - } + /** Index-offset within the original java.io Stream at which {@link chunk} had started. */ + public abstract long streamByteOffset(); - public byte[] getData() { - return data; + /** + * Index-offset within the original java.io Stream at which the final byte of {@link chunk} lived. + */ + public long finalByteOffset() { + return streamByteOffset() + size() - 1; } - public int getStart() { - return rangeStart; + public static Builder builder() { + return new org.datatransferproject.transfer.microsoft.AutoValue_DataChunk.Builder(); } - public int getEnd() { - return rangeStart + size - 1; - } + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setChunk(byte[] value); - public static List splitData(InputStream inputStream) throws IOException { - ArrayList chunksToSend = new ArrayList(); - byte[] data = new byte[CHUNK_SIZE]; - int totalFileSize = 0; - int quantityToSend; - int roomLeft = CHUNK_SIZE; - int offset = 0; - int chunksRead = 0; + public abstract Builder setStreamByteOffset(long value); - // start timing - while ((quantityToSend = inputStream.read(data, offset, roomLeft)) != -1) { - offset += quantityToSend; - roomLeft -= quantityToSend; - if (roomLeft == 0) { - chunksToSend.add(new DataChunk(data, CHUNK_SIZE, chunksRead * CHUNK_SIZE)); - chunksRead++; - roomLeft = CHUNK_SIZE; - offset = 0; - totalFileSize += CHUNK_SIZE; - data = new byte[CHUNK_SIZE]; - } - } - if (offset != 0) { - chunksToSend.add(new DataChunk(data, offset, chunksRead * CHUNK_SIZE)); - totalFileSize += offset; - chunksRead++; - } - return chunksToSend; + public abstract DataChunk build(); } - -} \ No newline at end of file +} diff --git a/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/MicrosoftTransferExtension.java b/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/MicrosoftTransferExtension.java index 13fb7d894..d6988380b 100644 --- a/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/MicrosoftTransferExtension.java +++ b/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/MicrosoftTransferExtension.java @@ -15,6 +15,7 @@ import okhttp3.OkHttpClient; import org.datatransferproject.api.launcher.ExtensionContext; import org.datatransferproject.api.launcher.Monitor; +import org.datatransferproject.spi.api.transport.JobFileStream; import org.datatransferproject.spi.cloud.storage.AppCredentialStore; import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore; import org.datatransferproject.types.common.models.DataVertical; @@ -120,6 +121,7 @@ public void initialize(ExtensionContext context) { // Create the MicrosoftCredentialFactory with the given {@link AppCredentials}. MicrosoftCredentialFactory credentialFactory = new MicrosoftCredentialFactory(httpTransport, jsonFactory, appCredentials); + JobFileStream jobFileStream = new JobFileStream(); Monitor monitor = context.getMonitor(); @@ -132,9 +134,9 @@ public void initialize(ExtensionContext context) { new MicrosoftCalendarImporter(BASE_GRAPH_URL, client, mapper, transformerService)); importBuilder.put( PHOTOS, new MicrosoftPhotosImporter(BASE_GRAPH_URL, client, mapper, jobStore, monitor, - credentialFactory)); + credentialFactory, jobFileStream)); importBuilder.put(MEDIA, new MicrosoftMediaImporter(BASE_GRAPH_URL, client, mapper, jobStore, monitor, - credentialFactory)); + credentialFactory, jobFileStream)); importerMap = importBuilder.build(); ImmutableMap.Builder exporterBuilder = ImmutableMap.builder(); diff --git a/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/StreamChunker.java b/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/StreamChunker.java new file mode 100644 index 000000000..c0a1a3ee8 --- /dev/null +++ b/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/StreamChunker.java @@ -0,0 +1,42 @@ +package org.datatransferproject.transfer.microsoft; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; + +/** + * Allows tracking reads across a stream. + * + *

Does not close the held input stream. + */ +public class StreamChunker { + private final int chunkSizeBytes; + private final InputStream inputStream; + + private long streamByteOffset = 0; + + public StreamChunker(int chunkSizeBytes, InputStream inputStream) { + this.inputStream = inputStream; + this.chunkSizeBytes = chunkSizeBytes; + } + + /** + * Constructs a new DataChunk from just {@code chunkSizeBytes} bytes of the stream. + * + *

Returned chunk will be less than or equal to chunkSizeBytes, or absent if no bytes were + * remaining in the stream. + */ + public Optional nextChunk() throws IOException { + byte[] chunkOfData = inputStream.readNBytes(chunkSizeBytes); + Optional resp = + chunkOfData.length == 0 + ? Optional.empty() + : Optional.of( + DataChunk.builder() + .setChunk(chunkOfData) + .setStreamByteOffset(streamByteOffset) + .build()); + streamByteOffset += chunkOfData.length; + return resp; + } +} diff --git a/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaImporter.java b/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaImporter.java index 8a7440c95..c5fffd043 100644 --- a/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaImporter.java +++ b/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaImporter.java @@ -15,6 +15,8 @@ */ package org.datatransferproject.transfer.microsoft.media; +import static org.datatransferproject.spi.api.transport.DiscardingStreamCounter.discardForLength; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.auth.oauth2.Credential; import com.google.common.base.Preconditions; @@ -22,11 +24,11 @@ import com.google.common.collect.ImmutableMap; import java.io.BufferedInputStream; import java.io.IOException; -import java.net.URL; +import java.io.InputStream; import java.util.Collection; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import okhttp3.MediaType; import okhttp3.OkHttpClient; @@ -34,7 +36,11 @@ import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; +import org.apache.commons.lang3.tuple.Pair; import org.datatransferproject.api.launcher.Monitor; +import org.datatransferproject.spi.api.transport.JobFileStream; +import org.datatransferproject.spi.api.transport.RemoteFileStreamer; +import org.datatransferproject.spi.api.transport.UrlGetStreamer; import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore; import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; import org.datatransferproject.spi.transfer.provider.ImportResult; @@ -44,23 +50,25 @@ import org.datatransferproject.spi.transfer.types.PermissionDeniedException; import org.datatransferproject.transfer.microsoft.DataChunk; import org.datatransferproject.transfer.microsoft.MicrosoftTransmogrificationConfig; +import org.datatransferproject.transfer.microsoft.StreamChunker; import org.datatransferproject.transfer.microsoft.common.MicrosoftCredentialFactory; import org.datatransferproject.types.common.DownloadableFile; import org.datatransferproject.types.common.models.media.MediaAlbum; import org.datatransferproject.types.common.models.media.MediaContainerResource; import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData; -import org.apache.commons.lang3.tuple.Pair; -/** - * Imports albums with their photos and videos to OneDrive using the Microsoft Graph API. - */ +/** Imports albums with their photos and videos to OneDrive using the Microsoft Graph API. */ public class MicrosoftMediaImporter implements Importer { + /** Max number of bytes to upload to Microsoft's APIs at a time. */ + private static final int MICROSOFT_UPLOAD_CHUNK_BYTE_SIZE = 32000 * 1024; // 32000KiB + private final OkHttpClient client; private final ObjectMapper objectMapper; private final TemporaryPerJobDataStore jobStore; private final Monitor monitor; private final MicrosoftCredentialFactory credentialFactory; + private final JobFileStream jobFileStream; private final MicrosoftTransmogrificationConfig transmogrificationConfig = new MicrosoftTransmogrificationConfig(); private Credential credential; @@ -73,7 +81,8 @@ public class MicrosoftMediaImporter public MicrosoftMediaImporter(String baseUrl, OkHttpClient client, ObjectMapper objectMapper, TemporaryPerJobDataStore jobStore, Monitor monitor, - MicrosoftCredentialFactory credentialFactory) { + MicrosoftCredentialFactory credentialFactory, + JobFileStream jobFileStream) { // NOTE: "special/photos" is a specific folder in One Drive that corresponds to items that // should appear in https://photos.onedrive.com/, for more information see: @@ -81,11 +90,10 @@ public MicrosoftMediaImporter(String baseUrl, OkHttpClient client, ObjectMapper createFolderUrl = baseUrl + "/v1.0/me/drive/special/photos/children"; albumlessMediaUrlTemplate = baseUrl + "/v1.0/me/drive/special/photos:/%s:/createUploadSession%s"; - + // first param is the folder id, second param is the file name // /me/drive/items/{parent-id}:/{filename}:/content; uploadMediaUrlTemplate = baseUrl + "/v1.0/me/drive/items/%s:/%s:/createUploadSession%s"; - this.client = client; this.objectMapper = objectMapper; @@ -93,6 +101,7 @@ public MicrosoftMediaImporter(String baseUrl, OkHttpClient client, ObjectMapper this.monitor = monitor; this.credentialFactory = credentialFactory; this.credential = null; + this.jobFileStream = jobFileStream; } @Override @@ -203,44 +212,51 @@ private void executeIdempotentImport( } private String importDownloadableItem( - DownloadableFile item, UUID jobId, - IdempotentImportExecutor idempotentImportExecutor) throws Exception { - BufferedInputStream inputStream = null; - if (item.isInTempStore()) { - inputStream = - new BufferedInputStream(jobStore.getStream(jobId, item.getFetchableUrl()).getStream()); - } else if (item.getFetchableUrl() != null) { - inputStream = new BufferedInputStream(new URL(item.getFetchableUrl()).openStream()); - } else { - throw new IllegalStateException("Don't know how to get the inputStream for " + item); + DownloadableFile item, UUID jobId, IdempotentImportExecutor idempotentImportExecutor) + throws Exception { + final long totalFileSize = discardForLength(jobFileStream.streamFile(item, jobId, jobStore)); + if (totalFileSize <= 0) { + throw new IOException(String.format( + "jobid %s hit empty unexpectedly empty (bytes=%d) download for file %s", + jobId, totalFileSize, item.getFetchableUrl())); } + InputStream fileStream = jobFileStream.streamFile(item, jobId, jobStore); String itemUploadUrl = createUploadSession(item, idempotentImportExecutor); - // Arrange the data to be uploaded in chunks - List chunksToSend = DataChunk.splitData(inputStream); - inputStream.close(); - final int totalFileSize = chunksToSend.stream().map(DataChunk::getSize).reduce(0, Integer::sum); - Preconditions.checkState( - chunksToSend.size() != 0, "Data was split into zero chunks %s.", item.getName()); - - Response chunkResponse = null; - for (DataChunk chunk : chunksToSend) { - chunkResponse = uploadChunk(chunk, itemUploadUrl, totalFileSize, item.getMimeType()); - } - if (chunkResponse.code() != 200 && chunkResponse.code() != 201) { + Response finalChunkResponse = + uploadStreamInChunks(totalFileSize, itemUploadUrl, item.getMimeType(), fileStream); + fileStream.close(); + if (finalChunkResponse.code() != 200 && finalChunkResponse.code() != 201) { // Once we upload the last chunk, we should have either 200 or 201. - // This should change to a precondition check after we debug some more. + // TODO: This should change to a precondition check after we debug some more. monitor.debug( - () -> "Received a bad code on completion of uploading chunks", chunkResponse.code()); + () -> "Received a bad code on completion of uploading chunks", finalChunkResponse.code()); } // get complete file response - ResponseBody chunkResponseBody = chunkResponse.body(); + ResponseBody finalChunkResponseBody = finalChunkResponse.body(); Map chunkResponseData = - objectMapper.readValue(chunkResponseBody.bytes(), Map.class); + objectMapper.readValue(finalChunkResponseBody.bytes(), Map.class); return (String) chunkResponseData.get("id"); } + /** Depletes input stream, uploading a chunk of the stream at a time. */ + private Response uploadStreamInChunks( + long totalFileSize, String itemUploadUrl, String itemMimeType, InputStream inputStream) throws IOException, DestinationMemoryFullException { + Response lastChunkResponse = null; + StreamChunker streamChunker = new StreamChunker(MICROSOFT_UPLOAD_CHUNK_BYTE_SIZE, inputStream); + Optional nextChunk; + while (true) { + nextChunk = streamChunker.nextChunk(); + if (nextChunk.isEmpty()) { + break; + } + lastChunkResponse = + uploadChunk(nextChunk.get(), itemUploadUrl, totalFileSize, itemMimeType); + } + return lastChunkResponse; + } + private Credential getOrCreateCredential(TokensAndUrlAuthData authData) { if (this.credential == null) { this.credential = this.credentialFactory.createCredential(authData); @@ -344,21 +360,21 @@ private Request.Builder buildCreateUploadSessionPath( // Content-Length: {chunk size in bytes} // Content-Range: bytes {begin}-{end}/{total size} // body={bytes} - private Response uploadChunk(DataChunk chunk, String photoUploadUrl, int totalFileSize, + private Response uploadChunk(DataChunk chunk, String photoUploadUrl, long totalFileSize, String mediaType) throws IOException, DestinationMemoryFullException { Request.Builder uploadRequestBuilder = new Request.Builder().url(photoUploadUrl); uploadRequestBuilder.header("Authorization", "Bearer " + credential.getAccessToken()); // put chunk data in RequestBody uploadChunkBody = - RequestBody.create(MediaType.parse(mediaType), chunk.getData(), 0, chunk.getSize()); + RequestBody.create(MediaType.parse(mediaType), chunk.chunk(), 0, chunk.size()); uploadRequestBuilder.put(uploadChunkBody); // set chunk data headers, indicating size and chunk range final String contentRange = - String.format("bytes %d-%d/%d", chunk.getStart(), chunk.getEnd(), totalFileSize); + String.format("bytes %d-%d/%d", chunk.streamByteOffset(), chunk.finalByteOffset(), totalFileSize); uploadRequestBuilder.header("Content-Range", contentRange); - uploadRequestBuilder.header("Content-Length", String.format("%d", chunk.getSize())); + uploadRequestBuilder.header("Content-Length", String.format("%d", chunk.size())); // upload the chunk Response chunkResponse = client.newCall(uploadRequestBuilder.build()).execute(); @@ -376,14 +392,15 @@ private Response uploadChunk(DataChunk chunk, String photoUploadUrl, int totalFi if (chunkCode == 507 && chunkResponse.message().contains("Insufficient Storage")) { throw new DestinationMemoryFullException("Microsoft destination storage limit reached", new IOException(String.format( - "Got error code %d with message: %s", chunkCode, chunkResponse.message()))); + "Got error HTTP status %d with message: %s", chunkCode, chunkResponse.message()))); } else if (chunkCode < 200 || chunkCode > 299) { - throw new IOException("Got error code: " + chunkCode + " message: " + chunkResponse.message() - + " body: " + chunkResponse.body().string()); + throw new IOException(String.format( + "Got error HTTP status: %d message: \"%s\" body: \"%s\"", chunkCode, chunkResponse.message(), + chunkResponse.body().string())); } else if (chunkCode == 200 || chunkCode == 201 || chunkCode == 202) { monitor.info(() - -> String.format("Uploaded chunk %s-%s successfuly, code %d", - chunk.getStart(), chunk.getEnd(), chunkCode)); + -> String.format("Uploaded chunk range %d-%d (of total bytesize: %d) successfuly, HTTP status %d", + chunk.streamByteOffset(), chunk.finalByteOffset(), totalFileSize, chunkCode)); } return chunkResponse; } diff --git a/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/photos/MicrosoftPhotosImporter.java b/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/photos/MicrosoftPhotosImporter.java index 569e0f952..1b9a7b307 100644 --- a/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/photos/MicrosoftPhotosImporter.java +++ b/extensions/data-transfer/portability-data-transfer-microsoft/src/main/java/org/datatransferproject/transfer/microsoft/photos/MicrosoftPhotosImporter.java @@ -15,17 +15,22 @@ */ package org.datatransferproject.transfer.microsoft.photos; +import static org.datatransferproject.spi.api.transport.DiscardingStreamCounter.discardForLength; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.auth.oauth2.Credential; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import java.io.BufferedInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.URL; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import okhttp3.MediaType; import okhttp3.OkHttpClient; @@ -34,6 +39,9 @@ import okhttp3.Response; import okhttp3.ResponseBody; import org.datatransferproject.api.launcher.Monitor; +import org.datatransferproject.spi.api.transport.UrlGetStreamer; +import org.datatransferproject.spi.api.transport.JobFileStream; +import org.datatransferproject.spi.api.transport.RemoteFileStreamer; import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore; import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; import org.datatransferproject.spi.transfer.provider.ImportResult; @@ -42,6 +50,7 @@ import org.datatransferproject.spi.transfer.types.DestinationMemoryFullException; import org.datatransferproject.spi.transfer.types.PermissionDeniedException; import org.datatransferproject.transfer.microsoft.DataChunk; +import org.datatransferproject.transfer.microsoft.StreamChunker; import org.datatransferproject.transfer.microsoft.MicrosoftTransmogrificationConfig; import org.datatransferproject.transfer.microsoft.common.MicrosoftCredentialFactory; import org.datatransferproject.types.common.models.photos.PhotoAlbum; @@ -52,14 +61,20 @@ /** * Imports albums and photos to OneDrive using the Microsoft Graph API. */ +// TODO de-dupe Microsoft{Media,Photos}Importer classes. They MediaImporter class is a direct fork +// of this one (when we thought we might delete this class). The Media fork forunately has more +// generic model-agnostic internals, so it should be easier to call its factored-out code from this +// class. public class MicrosoftPhotosImporter implements Importer { + private static final int MICROSOFT_UPLOAD_CHUNK_BYTE_SIZE = 32000 * 1024; // 32000KiB private final OkHttpClient client; private final ObjectMapper objectMapper; private final TemporaryPerJobDataStore jobStore; private final Monitor monitor; private final MicrosoftCredentialFactory credentialFactory; + private final JobFileStream jobFileStream; private final MicrosoftTransmogrificationConfig transmogrificationConfig = new MicrosoftTransmogrificationConfig(); private Credential credential; @@ -76,7 +91,8 @@ public MicrosoftPhotosImporter( ObjectMapper objectMapper, TemporaryPerJobDataStore jobStore, Monitor monitor, - MicrosoftCredentialFactory credentialFactory) { + MicrosoftCredentialFactory credentialFactory, + JobFileStream jobFileStream) { createFolderUrl = baseUrl + "/v1.0/me/drive/special/photos/children"; // first param is the folder id, second param is the file name // /me/drive/items/{parent-id}:/{filename}:/content; @@ -90,6 +106,7 @@ public MicrosoftPhotosImporter( this.monitor = monitor; this.credentialFactory = credentialFactory; this.credential = null; + this.jobFileStream = jobFileStream; } @Override @@ -189,43 +206,45 @@ private String createOneDriveFolder(PhotoAlbum album) throws IOException, CopyEx } private String importSinglePhoto( - PhotoModel photo, + PhotoModel item, UUID jobId, IdempotentImportExecutor idempotentImportExecutor) throws Exception { - BufferedInputStream inputStream = null; - if (photo.isInTempStore()) { - inputStream = new BufferedInputStream(jobStore.getStream(jobId, photo.getFetchableUrl()).getStream()); - } else if (photo.getFetchableUrl() != null) { - inputStream = new BufferedInputStream(new URL(photo.getFetchableUrl()).openStream()); - } else { - throw new IllegalStateException("Don't know how to get the inputStream for " + photo); - } + final long totalFileSize = discardForLength(jobFileStream.streamFile(item, jobId, jobStore)); + InputStream fileStream = jobFileStream.streamFile(item, jobId, jobStore); - String photoUploadUrl = createUploadSession(photo, idempotentImportExecutor); - - // Arrange the data to be uploaded in chunks - List chunksToSend = DataChunk.splitData(inputStream); - inputStream.close(); - final int totalFileSize = chunksToSend.stream().map(DataChunk::getSize).reduce(0, Integer::sum); - Preconditions.checkState( - chunksToSend.size() != 0, "Data was split into zero chunks %s.", photo.getTitle()); + String itemUploadUrl = createUploadSession(item, idempotentImportExecutor); - Response chunkResponse = null; - for (DataChunk chunk : chunksToSend) { - chunkResponse = uploadChunk(chunk, photoUploadUrl, totalFileSize, photo.getMediaType()); - } - if (chunkResponse.code() != 200 && chunkResponse.code() != 201) { + Response finalChunkResponse = + uploadStreamInChunks(totalFileSize, itemUploadUrl, item.getMimeType(), fileStream); + fileStream.close(); + if (finalChunkResponse.code() != 200 && finalChunkResponse.code() != 201) { // Once we upload the last chunk, we should have either 200 or 201. - // This should change to a precondition check after we debug some more. + // TODO: This should change to a precondition check after we debug some more. monitor.debug( - () -> "Received a bad code on completion of uploading chunks", chunkResponse.code()); + () -> "Received a bad code on completion of uploading chunks", finalChunkResponse.code()); } // get complete file response - ResponseBody chunkResponseBody = chunkResponse.body(); - Map chunkResponseData = objectMapper.readValue(chunkResponseBody.bytes(), Map.class); + ResponseBody finalChunkResponseBody = finalChunkResponse.body(); + Map chunkResponseData = + objectMapper.readValue(finalChunkResponseBody.bytes(), Map.class); return (String) chunkResponseData.get("id"); } + /** Depletes input stream, uploading a chunk of the stream at a time. */ + private Response uploadStreamInChunks( + long totalFileSize, String itemUploadUrl, String itemMimeType, InputStream inputStream) throws IOException, DestinationMemoryFullException { + Response lastChunkResponse = null; + StreamChunker streamChunker = new StreamChunker(MICROSOFT_UPLOAD_CHUNK_BYTE_SIZE, inputStream); + Optional nextChunk; + while (true) { + nextChunk = streamChunker.nextChunk(); + if (!nextChunk.isPresent()) break; + lastChunkResponse = + uploadChunk(nextChunk.get(), itemUploadUrl, totalFileSize, itemMimeType); + } + return lastChunkResponse; + } + private Credential getOrCreateCredential(TokensAndUrlAuthData authData) { if (this.credential == null) { this.credential = this.credentialFactory.createCredential(authData); @@ -324,21 +343,21 @@ private String createUploadSession(PhotoModel photo, IdempotentImportExecutor id // Content-Length: {chunk size in bytes} // Content-Range: bytes {begin}-{end}/{total size} // body={bytes} - private Response uploadChunk(DataChunk chunk, String photoUploadUrl, int totalFileSize, String mediaType) + private Response uploadChunk(DataChunk chunk, String photoUploadUrl, long totalFileSize, String mediaType) throws IOException, DestinationMemoryFullException { Request.Builder uploadRequestBuilder = new Request.Builder().url(photoUploadUrl); uploadRequestBuilder.header("Authorization", "Bearer " + credential.getAccessToken()); // put chunk data in - RequestBody uploadChunkBody = RequestBody.create(MediaType.parse(mediaType), chunk.getData(), 0, chunk.getSize()); + RequestBody uploadChunkBody = RequestBody.create(MediaType.parse(mediaType), chunk.chunk(), 0, chunk.size()); uploadRequestBuilder.put(uploadChunkBody); // set chunk data headers, indicating size and chunk range final String contentRange = - String.format("bytes %d-%d/%d", chunk.getStart(), chunk.getEnd(), totalFileSize); + String.format("bytes %d-%d/%d", chunk.streamByteOffset(), chunk.finalByteOffset(), totalFileSize); uploadRequestBuilder.header("Content-Range", contentRange); - uploadRequestBuilder.header("Content-Length", String.format("%d", chunk.getSize())); + uploadRequestBuilder.header("Content-Length", String.format("%d", chunk.size())); // upload the chunk Response chunkResponse = client.newCall(uploadRequestBuilder.build()).execute(); @@ -372,7 +391,7 @@ private Response uploadChunk(DataChunk chunk, String photoUploadUrl, int totalFi () -> String.format( "Uploaded chunk %s-%s successfuly, code %d", - chunk.getStart(), chunk.getEnd(), chunkCode)); + chunk.streamByteOffset(), chunk.finalByteOffset(), chunkCode)); } return chunkResponse; } diff --git a/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/DataChunkTest.java b/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/DataChunkTest.java deleted file mode 100644 index 49bacc023..000000000 --- a/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/DataChunkTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2019 The Data Transfer Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.datatransferproject.transfer.microsoft.photos; - -import static com.google.common.truth.Truth.assertThat; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.List; -import org.datatransferproject.transfer.microsoft.DataChunk; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - - -/** */ -public class DataChunkTest { - - private static final int CHUNK_SIZE = 32000 * 1024; // 32000KiB - - InputStream inputStream; - - @BeforeEach - public void setUp() throws IOException { - } - - @Test - public void testSplitDataSingleFullChunk() throws IOException { - inputStream = new ByteArrayInputStream(new byte[CHUNK_SIZE]); - List l = DataChunk.splitData(inputStream); - assertThat(l).hasSize(1); - assertThat(l.get(0).getSize()).isEqualTo(CHUNK_SIZE); - assertThat(l.get(0).getStart()).isEqualTo(0); - assertThat(l.get(0).getEnd()).isEqualTo(CHUNK_SIZE - 1); - } - - @Test - public void testSplitDataSingleNotFullChunk() throws IOException { - inputStream = new ByteArrayInputStream(new byte[CHUNK_SIZE-1]); - List l = DataChunk.splitData(inputStream); - assertThat(l).hasSize(1); - assertThat(l.get(0).getSize()).isEqualTo(CHUNK_SIZE - 1); - assertThat(l.get(0).getStart()).isEqualTo(0); - assertThat(l.get(0).getEnd()).isEqualTo(CHUNK_SIZE - 2); - } - - @Test - public void testSplitDataEmpty() throws IOException { - inputStream = new ByteArrayInputStream(new byte[0]); - List l = DataChunk.splitData(inputStream); - assertThat(l).hasSize(0); - } - - @Test - public void testSplitTwoEvenChunks() throws IOException { - inputStream = new ByteArrayInputStream(new byte[CHUNK_SIZE*2]); - List l = DataChunk.splitData(inputStream); - assertThat(l).hasSize(2); - assertThat(l.get(0).getSize()).isEqualTo(CHUNK_SIZE); - assertThat(l.get(0).getStart()).isEqualTo(0); - assertThat(l.get(0).getEnd()).isEqualTo(CHUNK_SIZE - 1); - assertThat(l.get(1).getSize()).isEqualTo(CHUNK_SIZE); - assertThat(l.get(1).getStart()).isEqualTo(CHUNK_SIZE); - assertThat(l.get(1).getEnd()).isEqualTo(2*CHUNK_SIZE - 1); - } - - @Test - public void testSplitTwoChunksUneven() throws IOException { - inputStream = new ByteArrayInputStream(new byte[CHUNK_SIZE*2 - 10]); - List l = DataChunk.splitData(inputStream); - assertThat(l).hasSize(2); - assertThat(l.get(0).getSize()).isEqualTo(CHUNK_SIZE); - assertThat(l.get(0).getStart()).isEqualTo(0); - assertThat(l.get(0).getEnd()).isEqualTo(CHUNK_SIZE - 1); - assertThat(l.get(1).getSize()).isEqualTo(CHUNK_SIZE - 10); - assertThat(l.get(1).getStart()).isEqualTo(CHUNK_SIZE); - assertThat(l.get(1).getEnd()).isEqualTo(2*CHUNK_SIZE - 11); - } - -} diff --git a/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/StreamChunkerTest.java b/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/StreamChunkerTest.java new file mode 100644 index 000000000..777571ce5 --- /dev/null +++ b/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/StreamChunkerTest.java @@ -0,0 +1,92 @@ +package org.datatransferproject.transfer.microsoft.photos; + +import static com.google.common.truth.Truth.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Optional; +import org.datatransferproject.transfer.microsoft.DataChunk; +import org.datatransferproject.transfer.microsoft.StreamChunker; +import org.junit.jupiter.api.Test; + +public class StreamChunkerTest { + private static final int TEST_CHUNK_SIZE = 32000 * 1024; // 32000KiB + + @Test + public void testSplitDataSingleFullChunk() throws IOException { + StreamChunker streamChunker = + new StreamChunker(TEST_CHUNK_SIZE, new ByteArrayInputStream(new byte[TEST_CHUNK_SIZE])); + + Optional l = streamChunker.nextChunk(); + assertThat(l.isPresent()).isTrue(); + assertThat(l.get().size()).isEqualTo(TEST_CHUNK_SIZE); + assertThat(l.get().streamByteOffset()).isEqualTo(0); + assertThat(l.get().finalByteOffset()).isEqualTo(TEST_CHUNK_SIZE - 1); + + assertThat(streamChunker.nextChunk().isEmpty()).isTrue(); + } + + @Test + public void testSplitDataSingleNotFullChunk() throws IOException { + StreamChunker streamChunker = + new StreamChunker(TEST_CHUNK_SIZE, new ByteArrayInputStream(new byte[TEST_CHUNK_SIZE - 1])); + + Optional l = streamChunker.nextChunk(); + assertThat(l.isPresent()).isTrue(); + assertThat(l.get().size()).isEqualTo(TEST_CHUNK_SIZE - 1); + assertThat(l.get().streamByteOffset()).isEqualTo(0); + assertThat(l.get().finalByteOffset()).isEqualTo(TEST_CHUNK_SIZE - 2); + + assertThat(streamChunker.nextChunk().isEmpty()).isTrue(); + } + + @Test + public void testSplitDataEmpty() throws IOException { + StreamChunker streamChunker = + new StreamChunker(TEST_CHUNK_SIZE, new ByteArrayInputStream(new byte[0])); + + Optional l = streamChunker.nextChunk(); + assertThat(l.isEmpty()).isTrue(); + } + + @Test + public void testSplitTwoEvenChunks() throws IOException { + StreamChunker streamChunker = + new StreamChunker(TEST_CHUNK_SIZE, new ByteArrayInputStream(new byte[TEST_CHUNK_SIZE * 2])); + + Optional l = streamChunker.nextChunk(); + assertThat(l.isPresent()).isTrue(); + assertThat(l.get().size()).isEqualTo(TEST_CHUNK_SIZE); + assertThat(l.get().streamByteOffset()).isEqualTo(0); + assertThat(l.get().finalByteOffset()).isEqualTo(TEST_CHUNK_SIZE - 1); + + l = streamChunker.nextChunk(); + assertThat(l.isPresent()).isTrue(); + assertThat(l.get().size()).isEqualTo(TEST_CHUNK_SIZE); + assertThat(l.get().streamByteOffset()).isEqualTo(TEST_CHUNK_SIZE); + assertThat(l.get().finalByteOffset()).isEqualTo(2 * TEST_CHUNK_SIZE - 1); + + assertThat(streamChunker.nextChunk().isEmpty()).isTrue(); + } + + @Test + public void testSplitTwoChunksUneven() throws IOException { + StreamChunker streamChunker = + new StreamChunker( + TEST_CHUNK_SIZE, new ByteArrayInputStream(new byte[TEST_CHUNK_SIZE * 2 - 10])); + + Optional l = streamChunker.nextChunk(); + assertThat(l.isPresent()).isTrue(); + assertThat(l.get().size()).isEqualTo(TEST_CHUNK_SIZE); + assertThat(l.get().streamByteOffset()).isEqualTo(0); + assertThat(l.get().finalByteOffset()).isEqualTo(TEST_CHUNK_SIZE - 1); + + l = streamChunker.nextChunk(); + assertThat(l.isPresent()).isTrue(); + assertThat(l.get().size()).isEqualTo(TEST_CHUNK_SIZE - 10); + assertThat(l.get().streamByteOffset()).isEqualTo(TEST_CHUNK_SIZE); + assertThat(l.get().finalByteOffset()).isEqualTo(2 * TEST_CHUNK_SIZE - 11); + + assertThat(streamChunker.nextChunk().isEmpty()).isTrue(); + } +} diff --git a/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaExporterTest.java b/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaExporterTest.java index 94cbf462c..024ec1902 100644 --- a/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaExporterTest.java +++ b/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaExporterTest.java @@ -36,8 +36,8 @@ import org.datatransferproject.types.common.StringPaginationToken; import org.datatransferproject.types.common.models.ContainerResource; import org.datatransferproject.types.common.models.IdOnlyContainerResource; -import org.datatransferproject.types.common.models.media.MediaContainerResource; import org.datatransferproject.types.common.models.media.MediaAlbum; +import org.datatransferproject.types.common.models.media.MediaContainerResource; import org.datatransferproject.types.common.models.photos.PhotoModel; import org.datatransferproject.types.common.models.videos.VideoModel; import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData; diff --git a/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaImporterTest.java b/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaImporterTest.java index d72865626..cb636560b 100644 --- a/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaImporterTest.java +++ b/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/media/MicrosoftMediaImporterTest.java @@ -16,6 +16,7 @@ package org.datatransferproject.transfer.microsoft.media; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.truth.Truth.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -33,6 +34,7 @@ import com.google.common.collect.ImmutableList; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.UUID; import okhttp3.Call; @@ -44,6 +46,8 @@ import okio.Buffer; import org.datatransferproject.api.launcher.Monitor; import org.datatransferproject.launcher.monitor.ConsoleMonitor; +import org.datatransferproject.spi.api.transport.JobFileStream; +import org.datatransferproject.spi.api.transport.RemoteFileStreamer; import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore; import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper; import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; @@ -51,12 +55,15 @@ import org.datatransferproject.spi.transfer.types.PermissionDeniedException; import org.datatransferproject.test.types.FakeIdempotentImportExecutor; import org.datatransferproject.transfer.microsoft.common.MicrosoftCredentialFactory; +import org.datatransferproject.types.common.DownloadableFile; import org.datatransferproject.types.common.models.media.MediaAlbum; import org.datatransferproject.types.common.models.media.MediaContainerResource; import org.datatransferproject.types.common.models.photos.PhotoModel; import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData; import org.junit.Before; import org.junit.Test; +import org.mockito.stubbing.Answer; +import org.mockito.invocation.InvocationOnMock; /** * This tests the MicrosoftMediaImporter. As of now, it only tests the number of requests called. @@ -77,6 +84,7 @@ public class MicrosoftMediaImporterTest { MicrosoftCredentialFactory credentialFactory; IdempotentImportExecutor executor; TokensAndUrlAuthData authData; + RemoteFileStreamer remoteFileStreamer; @Before public void setUp() throws IOException { @@ -90,12 +98,14 @@ public void setUp() throws IOException { monitor = new ConsoleMonitor(ConsoleMonitor.Level.INFO); credentialFactory = mock(MicrosoftCredentialFactory.class); credential = new Credential.Builder(BearerToken.authorizationHeaderAccessMethod()).build(); + RemoteFileStreamer remoteFileStreamer = mock(RemoteFileStreamer.class); when(credentialFactory.createCredential(any())).thenReturn(credential); when(credentialFactory.refreshCredential(any())).thenReturn(credential); credential.setAccessToken("acc"); credential.setExpirationTimeMilliseconds(null); - importer = new MicrosoftMediaImporter( - BASE_URL, client, objectMapper, jobStore, monitor, credentialFactory); + importer = + new MicrosoftMediaImporter( + BASE_URL, client, objectMapper, jobStore, monitor, credentialFactory, new JobFileStream(remoteFileStreamer)); } @Test @@ -170,19 +180,50 @@ public void testImportItemPermissionDenied() throws Exception { }); } + private static M fakeJobstoreModel( + TemporaryPerJobDataStore jobStore, + M model, + byte[] contents) throws IOException { + checkState( + model.isInTempStore(), + "nonsensical fake: trying to fake a tempstore entry with isInTempStore() set to false"); + when(jobStore.getStream(uuid, model.getFetchableUrl())) .thenAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + return new InputStreamWrapper(new ByteArrayInputStream(contents)); + } + }); + return model; + } + @Test public void testImportItemAllSuccess() throws Exception { List albums = ImmutableList.of(new MediaAlbum("id1", "albumb1", "This is a fake albumb")); - List photos = ImmutableList.of( - new PhotoModel("Pic1", "http://fake.com/1.jpg", "A pic", "image/jpg", "p1", "id1", true), - new PhotoModel( - "Pic2", "https://fake.com/2.png", "fine art", "image/png", "p2", "id1", true)); - when(jobStore.getStream(uuid, "http://fake.com/1.jpg")) - .thenReturn(new InputStreamWrapper(new ByteArrayInputStream(new byte[CHUNK_SIZE]))); - when(jobStore.getStream(uuid, "https://fake.com/2.png")) - .thenReturn(new InputStreamWrapper(new ByteArrayInputStream(new byte[CHUNK_SIZE]))); + Collection photos = + ImmutableList.of( + fakeJobstoreModel( + jobStore, + new PhotoModel( + "Pic1", + "http://fake.com/1.jpg", + "A pic", + "image/jpg", + "p1", // dataId + "id1", // albumdId + true /*isInTempStore*/), + new byte[CHUNK_SIZE]), + fakeJobstoreModel( + jobStore, + new PhotoModel( + "Pic2", + "http://fake.com/2.png", + "fine art", + "image/png", + "p2", // dataId + "id1", // albumdId + true /*isInTempStore*/), + new byte[CHUNK_SIZE])); MediaContainerResource data = new MediaContainerResource(albums, photos, null /*videos*/); Call call = mock(Call.class); diff --git a/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/photos/MicrosoftPhotosImporterTest.java b/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/photos/MicrosoftPhotosImporterTest.java index 63fc36712..6495e02ad 100644 --- a/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/photos/MicrosoftPhotosImporterTest.java +++ b/extensions/data-transfer/portability-data-transfer-microsoft/src/test/java/org/datatransferproject/transfer/microsoft/photos/MicrosoftPhotosImporterTest.java @@ -16,6 +16,7 @@ package org.datatransferproject.transfer.microsoft.photos; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.truth.Truth.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -44,6 +45,8 @@ import okio.Buffer; import org.datatransferproject.api.launcher.Monitor; import org.datatransferproject.launcher.monitor.ConsoleMonitor; +import org.datatransferproject.spi.api.transport.JobFileStream; +import org.datatransferproject.spi.api.transport.RemoteFileStreamer; import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore; import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore.InputStreamWrapper; import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; @@ -51,12 +54,15 @@ import org.datatransferproject.spi.transfer.types.PermissionDeniedException; import org.datatransferproject.test.types.FakeIdempotentImportExecutor; import org.datatransferproject.transfer.microsoft.common.MicrosoftCredentialFactory; +import org.datatransferproject.types.common.DownloadableFile; import org.datatransferproject.types.common.models.photos.PhotoAlbum; import org.datatransferproject.types.common.models.photos.PhotoModel; import org.datatransferproject.types.common.models.photos.PhotosContainerResource; import org.datatransferproject.types.transfer.auth.TokensAndUrlAuthData; import org.junit.Before; import org.junit.Test; +import org.mockito.stubbing.Answer; +import org.mockito.invocation.InvocationOnMock; /** @@ -78,6 +84,7 @@ public class MicrosoftPhotosImporterTest { MicrosoftCredentialFactory credentialFactory; IdempotentImportExecutor executor; TokensAndUrlAuthData authData; + JobFileStream jobFileStream; @Before @@ -96,13 +103,16 @@ public void setUp() throws IOException { when(credentialFactory.refreshCredential(any())).thenReturn(credential); credential.setAccessToken("acc"); credential.setExpirationTimeMilliseconds(null); + RemoteFileStreamer remoteFileStreamer = mock(RemoteFileStreamer.class); + JobFileStream jobFileStream = new JobFileStream(remoteFileStreamer); importer = new MicrosoftPhotosImporter( BASE_URL, client, objectMapper, jobStore, monitor, - credentialFactory + credentialFactory, + jobFileStream ); } @@ -172,6 +182,21 @@ public void testImportItemPermissionDenied() throws Exception { }); } + private static M fakeJobstoreModel( + TemporaryPerJobDataStore jobStore, + M model, + byte[] contents) throws IOException { + checkState( + model.isInTempStore(), + "nonsensical fake: trying to fake a tempstore entry with isInTempStore() set to false"); + when(jobStore.getStream(uuid, model.getFetchableUrl())) .thenAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + return new InputStreamWrapper(new ByteArrayInputStream(contents)); + } + }); + return model; + } + @Test public void testImportItemAllSuccess() throws Exception { List albums = @@ -179,15 +204,28 @@ public void testImportItemAllSuccess() throws Exception { List photos = ImmutableList.of( - new PhotoModel( - "Pic1", "http://fake.com/1.jpg", "A pic", "image/jpg", "p1", "id1", - true), - new PhotoModel( - "Pic2", "https://fake.com/2.png", "fine art", "image/png", "p2", "id1", true)); - when(jobStore.getStream(uuid, "http://fake.com/1.jpg")) - .thenReturn(new InputStreamWrapper(new ByteArrayInputStream(new byte[CHUNK_SIZE]))); - when(jobStore.getStream(uuid, "https://fake.com/2.png")) - .thenReturn(new InputStreamWrapper(new ByteArrayInputStream(new byte[CHUNK_SIZE]))); + fakeJobstoreModel( + jobStore, + new PhotoModel( + "Pic1", + "http://fake.com/1.jpg", + "A pic", + "image/jpg", + "p1", // dataId + "id1", // albumId + true /*isInTempStore*/), + new byte[CHUNK_SIZE]), + fakeJobstoreModel( + jobStore, + new PhotoModel( + "Pic2", + "https://fake.com/2.png", + "fine art", + "image/png", + "p2", // dataId + "id1", // albumId + true /*isInTempStore*/), + new byte[CHUNK_SIZE])); PhotosContainerResource data = new PhotosContainerResource(albums, photos); Call call = mock(Call.class); diff --git a/portability-spi-api/build.gradle b/portability-spi-api/build.gradle index d819faaa7..becd9e4ae 100644 --- a/portability-spi-api/build.gradle +++ b/portability-spi-api/build.gradle @@ -22,6 +22,7 @@ plugins { dependencies { compile project(':portability-types-common') compile project(':portability-types-transfer') + compile project(':portability-spi-cloud') compile project(':portability-api-launcher') } -configurePublication(project) \ No newline at end of file +configurePublication(project) diff --git a/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/DiscardingStreamCounter.java b/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/DiscardingStreamCounter.java new file mode 100644 index 000000000..c8d2081e3 --- /dev/null +++ b/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/DiscardingStreamCounter.java @@ -0,0 +1,20 @@ +package org.datatransferproject.spi.api.transport; + +import com.google.common.io.CountingInputStream; +import java.io.IOException; +import java.io.InputStream; + +public class DiscardingStreamCounter { + private DiscardingStreamCounter() {} + + /** Returns byte size of stream, discarding its contents and closing the stream. */ + public static long discardForLength(InputStream stream) throws IOException { + CountingInputStream counter = new CountingInputStream(stream); + while (true) { + if (counter.skip(Integer.MAX_VALUE) < Integer.MAX_VALUE) { + counter.close(); + return counter.getCount(); + } + } + } +} diff --git a/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/FileStreamer.java b/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/FileStreamer.java new file mode 100644 index 000000000..e19b46308 --- /dev/null +++ b/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/FileStreamer.java @@ -0,0 +1,9 @@ +package org.datatransferproject.spi.api.transport; + +import java.io.IOException; +import java.io.InputStream; +import org.datatransferproject.types.common.DownloadableItem; + +public interface FileStreamer { + InputStream get(DownloadableItem downloadableItem) throws IOException; +} diff --git a/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/JobFileStream.java b/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/JobFileStream.java new file mode 100644 index 000000000..47a952964 --- /dev/null +++ b/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/JobFileStream.java @@ -0,0 +1,35 @@ +package org.datatransferproject.spi.api.transport; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InputStream; +import java.util.UUID; +import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore; +import org.datatransferproject.types.common.DownloadableFile; + +public class JobFileStream { + private final RemoteFileStreamer remoteFileStreamer; + + @VisibleForTesting + public JobFileStream(RemoteFileStreamer remoteFileStreamer) { + this.remoteFileStreamer = remoteFileStreamer; + } + + public JobFileStream() { + this(new UrlGetStreamer()); + } + + /** Streams a file from wherever it lives, making no attempt to tee into jobStore. */ + public InputStream streamFile( + DownloadableFile downloadableFile, UUID jobId, TemporaryPerJobDataStore jobStore) + throws IOException { + checkState(downloadableFile.getFetchableUrl() != null, "missing fetchable URL for file"); + if (downloadableFile.isInTempStore()) { + return jobStore.getStream(jobId, downloadableFile.getFetchableUrl()).getStream(); + } else { + return this.remoteFileStreamer.get(downloadableFile); + } + } +} diff --git a/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/RemoteFileStreamer.java b/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/RemoteFileStreamer.java new file mode 100644 index 000000000..afca49010 --- /dev/null +++ b/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/RemoteFileStreamer.java @@ -0,0 +1,11 @@ +package org.datatransferproject.spi.api.transport; + +import java.io.IOException; +import java.io.InputStream; +import org.datatransferproject.types.common.DownloadableItem; + +public interface RemoteFileStreamer { + public InputStream get(String remoteUrl) throws IOException; + + public InputStream get(DownloadableItem downloadableItem) throws IOException; +} diff --git a/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/UrlGetStreamer.java b/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/UrlGetStreamer.java new file mode 100644 index 000000000..7b41c5e15 --- /dev/null +++ b/portability-spi-api/src/main/java/org/datatransferproject/spi/api/transport/UrlGetStreamer.java @@ -0,0 +1,46 @@ +package org.datatransferproject.spi.api.transport; + +import static com.google.common.base.Preconditions.checkState; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import org.datatransferproject.types.common.DownloadableItem; + +/** Implements a simple HTTP GET against a URL to stream the results. */ +// TODO(techdebt) update Smugmug's codebase to implement their own, since they have special +// getImageAsStream impl that isn't a simple GET: +// https://github.com/dtinit/data-transfer-project/blob/9723399b5b4a66ab431822b2a95f45e6d3380b32/extensions/data-transfer/portability-data-transfer-smugmug/src/main/java/org/datatransferproject/transfer/smugmug/photos/SmugMugPhotosImporter.java#L136 +// This will let the DTP codebase share test patterns across adapters. +public class UrlGetStreamer implements RemoteFileStreamer { + @Override + public InputStream get(String remoteUrl) throws IOException { + return new BufferedInputStream(toUrl(remoteUrl).openStream()); + } + + @Override + public InputStream get(DownloadableItem downloadableItem) throws IOException { + checkState( + downloadableItem.getFetchableUrl() != null, + "trying to download incomplete DownloadableItem: missing fetchable URL"); + checkState( + !downloadableItem.isInTempStore(), + "trying to re-download an already stored item: \"%s\"", + downloadableItem.getFetchableUrl()); + + return get(downloadableItem.getFetchableUrl()); + } + + /** Easily construct a {@link java.net.URL} while mapping to the exceptions DTP needs. */ + private static URL toUrl(String url) throws IOException { + try { + return new URI(url).toURL(); + } catch (MalformedURLException | URISyntaxException e) { + throw new IOException(String.format("invalid URL: \"%s\"", url), e); + } + } +} diff --git a/portability-spi-cloud/src/main/java/org/datatransferproject/spi/cloud/storage/TemporaryPerJobDataStore.java b/portability-spi-cloud/src/main/java/org/datatransferproject/spi/cloud/storage/TemporaryPerJobDataStore.java index 9f8faa00b..c103d0030 100644 --- a/portability-spi-cloud/src/main/java/org/datatransferproject/spi/cloud/storage/TemporaryPerJobDataStore.java +++ b/portability-spi-cloud/src/main/java/org/datatransferproject/spi/cloud/storage/TemporaryPerJobDataStore.java @@ -1,5 +1,6 @@ package org.datatransferproject.spi.cloud.storage; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -70,6 +71,13 @@ public InputStream getStream() { return stream; } + /** + * Size of the stream as a byte-count, or zero if yet unknown. + * + *

See also {@link org.datatransferproject.transfer.CallableSizeCalculator} and {@ + * com.google.common.io.CountingInputStream} for some brute-force options when the return is + * zero. + */ public Long getBytes() { return bytes; } diff --git a/portability-types-common/src/main/java/org/datatransferproject/types/common/DownloadableItem.java b/portability-types-common/src/main/java/org/datatransferproject/types/common/DownloadableItem.java index 7dde045c6..e8b387e9d 100644 --- a/portability-types-common/src/main/java/org/datatransferproject/types/common/DownloadableItem.java +++ b/portability-types-common/src/main/java/org/datatransferproject/types/common/DownloadableItem.java @@ -1,11 +1,14 @@ package org.datatransferproject.types.common; /** - * Represent an item we can download through a URL and store in a temporary storage. PhotoModel is a - * good example. Often, we check if the item is in the job store and download it if it isn't. + * Represent an item we can download through a URL and store in a temporary storage. + * + *

PhotoModel is a good example. Often, we check if the item is in the job store and download it + * if it isn't. */ public interface DownloadableItem extends ImportableItem { + /** Remote or local URL used to download and identify an item. */ String getFetchableUrl(); boolean isInTempStore(); From 52510bf84c92567b657d253ad0abca20fe65bee2 Mon Sep 17 00:00:00 2001 From: Jonathan Zacsh Date: Tue, 19 Nov 2024 15:07:32 -0600 Subject: [PATCH 2/2] noop(cleanup) dedup w/factored-out (#1390) counter this utilizes the newly factored out streaming counter, so _should_ be noop. --- portability-transfer/build.gradle | 1 + .../transfer/CallableSizeCalculator.java | 24 +++++-------------- 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/portability-transfer/build.gradle b/portability-transfer/build.gradle index b2a24104d..9a2dab8d2 100644 --- a/portability-transfer/build.gradle +++ b/portability-transfer/build.gradle @@ -36,6 +36,7 @@ configurations { dependencies { compile project(':portability-api-launcher') + compile project(':portability-spi-api') compile project(':portability-spi-service') compile project(':portability-spi-cloud') compile project(':portability-spi-transfer') diff --git a/portability-transfer/src/main/java/org/datatransferproject/transfer/CallableSizeCalculator.java b/portability-transfer/src/main/java/org/datatransferproject/transfer/CallableSizeCalculator.java index d450f617d..5984d0cd6 100644 --- a/portability-transfer/src/main/java/org/datatransferproject/transfer/CallableSizeCalculator.java +++ b/portability-transfer/src/main/java/org/datatransferproject/transfer/CallableSizeCalculator.java @@ -1,7 +1,7 @@ package org.datatransferproject.transfer; -import java.io.IOException; -import java.io.InputStream; +import static org.datatransferproject.spi.api.transport.DiscardingStreamCounter.discardForLength; + import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; @@ -19,7 +19,9 @@ public class CallableSizeCalculator implements Callable> { private final Collection items; public CallableSizeCalculator( - UUID jobId, ConnectionProvider connectionProvider, Collection items) { + UUID jobId, + ConnectionProvider connectionProvider, + Collection items) { this.jobId = Objects.requireNonNull(jobId); this.connectionProvider = Objects.requireNonNull(connectionProvider); this.items = Objects.requireNonNull(items); @@ -32,7 +34,7 @@ public Map call() throws Exception { InputStreamWrapper stream = connectionProvider.getInputStreamForItem(jobId, item); long size = stream.getBytes(); if (size <= 0) { - size = computeSize(stream); + size = discardForLength(stream.getStream()); } result.put(item.getIdempotentId(), size); @@ -40,18 +42,4 @@ public Map call() throws Exception { return result; } - - // Reads the input stream in full - private Long computeSize(InputStreamWrapper stream) throws IOException { - long size = 0; - try (InputStream inStream = stream.getStream()) { - byte[] buffer = new byte[1024 * 1024]; // 1MB - int chunkBytesRead; - while ((chunkBytesRead = inStream.read(buffer)) != -1) { - size += chunkBytesRead; - } - } - - return size; - } }