Skip to content

Commit

Permalink
fix microsoft OOOM: don't stream into memory
Browse files Browse the repository at this point in the history
I'm fixing a bug here where the implementation of the microsoft adapter
was reading an arbitrary stream into memory (using DataChunk class), and
instead "just stream directly to the endpoint". It's not _quite_ that
simple though: we actually stream twice, for the very reason that the
old implementation was streaming into memory: we need to know the
filesize.

while we're here I'm doing some "leave the campground better" tasks, so
here's a more nitty-gritty breakdown of this PR:
-  microsoft adapter: marking duplicated code Microsoft{Photo,Video}* as
   needing to rely on newly refactored MicrosoftMedia (so hese kinds of
   bug fixes are easier to maintain, for example).
- DTP: make testability (via DI) easier with for java.net.URL streamers
  (this has already become a pattern, so I just formalized it and
  dropped a TODO in the places that are doing this locally; this way, in
  the future it'll be easier/more obvious what "the pattern" is and how
  to stop maintaining disparate copies of it)
- microsoft adapter: remainder of DataChunk code is now just a POJO, so
  switching to autovalue and letting the test live in the primary user:
  the new `StreamChunker` (as a mini "integrated" test).
- microsoft adapter: all size-related operations are a `long` now
  • Loading branch information
jzacsh committed Nov 19, 2024
1 parent 2b9252c commit 547a696
Show file tree
Hide file tree
Showing 20 changed files with 518 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ private static String cleanString(String string) {
return Strings.isNullOrEmpty(string) ? "" : string;
}

// TODO migrate this testability-surface to newly shared JobFileStreamer and RemoteFileStreamer
// of org.datatransferproject.spi.api.transport package.
@VisibleForTesting
class ImageStreamProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ plugins {
dependencies {
compile project(':portability-spi-cloud')
compile project(':portability-spi-transfer')
compile project(':portability-spi-api')

compile "com.squareup.okhttp3:okhttp:${okHttpVersion}"
compile "com.squareup.okhttp3:logging-interceptor:${okHttpVersion}"
compile("com.google.api-client:google-api-client:${googleApiClient}")

compileOnly "com.google.auto.value:auto-value-annotations:${autoValueVersion}"
annotationProcessor "com.google.auto.value:auto-value:${autoValueVersion}"

// REVIEW: We could standardize the version in gradle.propertoes but this would mean all dependent extensions must be revved at the same time
compile 'com.googlecode.ez-vcard:ez-vcard:0.10.3'

testImplementation "org.mockito:mockito-inline:${mockitoInlineVersion}"
testCompile "org.mockito:mockito-core:${mockitoCoreVersion}"
testCompile project(':extensions:auth:portability-auth-harness-microsoft')
testCompile group: 'com.squareup.okhttp', name: 'mockwebserver', version: '2.7.5'
testCompile group: 'com.squareup.okhttp3', name: 'mockwebserver', version: '3.2.0'
testCompile("com.google.http-client:google-http-client-gson:${googleHttpClientVersion}")
testImplementation "org.mockito:mockito-inline:${mockitoVersion}"

}

configurePublication(project)
Original file line number Diff line number Diff line change
@@ -1,70 +1,38 @@
package org.datatransferproject.transfer.microsoft;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import com.google.auto.value.AutoValue;

/**
This utility class allows us to break up an InputStream into multiple chunks
for part-by-part upload to a service, for example to be consumed in an upload session.
*/
public class DataChunk {
private static final int CHUNK_SIZE = 32000 * 1024; // 32000KiB
/** Describe small buffers of bytes captured from a large java.io Stream. */
@AutoValue
public abstract class DataChunk {
/** Bytes being held in this buffer. */
public abstract byte[] chunk();

private final byte[] data;
private final int size;
private final int rangeStart;
public DataChunk(byte[] data, int size, int rangeStart) {
this.data = data;
this.size = size;
this.rangeStart = rangeStart;
/** Byte count of {@link chunk}. */
public int size() {
return chunk().length;
}

public int getSize() {
return size;
}
/** Index-offset within the original java.io Stream at which {@link chunk} had started. */
public abstract long streamByteOffset();

public byte[] getData() {
return data;
/**
* Index-offset within the original java.io Stream at which the final byte of {@link chunk} lived.
*/
public long finalByteOffset() {
return streamByteOffset() + size() - 1;
}

public int getStart() {
return rangeStart;
public static Builder builder() {
return new org.datatransferproject.transfer.microsoft.AutoValue_DataChunk.Builder();
}

public int getEnd() {
return rangeStart + size - 1;
}
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setChunk(byte[] value);

public static List<DataChunk> splitData(InputStream inputStream) throws IOException {
ArrayList<DataChunk> chunksToSend = new ArrayList();
byte[] data = new byte[CHUNK_SIZE];
int totalFileSize = 0;
int quantityToSend;
int roomLeft = CHUNK_SIZE;
int offset = 0;
int chunksRead = 0;
public abstract Builder setStreamByteOffset(long value);

// start timing
while ((quantityToSend = inputStream.read(data, offset, roomLeft)) != -1) {
offset += quantityToSend;
roomLeft -= quantityToSend;
if (roomLeft == 0) {
chunksToSend.add(new DataChunk(data, CHUNK_SIZE, chunksRead * CHUNK_SIZE));
chunksRead++;
roomLeft = CHUNK_SIZE;
offset = 0;
totalFileSize += CHUNK_SIZE;
data = new byte[CHUNK_SIZE];
}
}
if (offset != 0) {
chunksToSend.add(new DataChunk(data, offset, chunksRead * CHUNK_SIZE));
totalFileSize += offset;
chunksRead++;
}
return chunksToSend;
public abstract DataChunk build();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import okhttp3.OkHttpClient;
import org.datatransferproject.api.launcher.ExtensionContext;
import org.datatransferproject.api.launcher.Monitor;
import org.datatransferproject.spi.api.transport.JobFileStream;
import org.datatransferproject.spi.cloud.storage.AppCredentialStore;
import org.datatransferproject.spi.cloud.storage.TemporaryPerJobDataStore;
import org.datatransferproject.types.common.models.DataVertical;
Expand Down Expand Up @@ -120,6 +121,7 @@ public void initialize(ExtensionContext context) {
// Create the MicrosoftCredentialFactory with the given {@link AppCredentials}.
MicrosoftCredentialFactory credentialFactory =
new MicrosoftCredentialFactory(httpTransport, jsonFactory, appCredentials);
JobFileStream jobFileStream = new JobFileStream();

Monitor monitor = context.getMonitor();

Expand All @@ -132,9 +134,9 @@ public void initialize(ExtensionContext context) {
new MicrosoftCalendarImporter(BASE_GRAPH_URL, client, mapper, transformerService));
importBuilder.put(
PHOTOS, new MicrosoftPhotosImporter(BASE_GRAPH_URL, client, mapper, jobStore, monitor,
credentialFactory));
credentialFactory, jobFileStream));
importBuilder.put(MEDIA, new MicrosoftMediaImporter(BASE_GRAPH_URL, client, mapper, jobStore, monitor,
credentialFactory));
credentialFactory, jobFileStream));
importerMap = importBuilder.build();

ImmutableMap.Builder<DataVertical, Exporter> exporterBuilder = ImmutableMap.builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.datatransferproject.transfer.microsoft;

import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;

/**
* Allows tracking reads across a stream.
*
* <p>Does not close the held input stream.
*/
public class StreamChunker {
private final int chunkSizeBytes;
private final InputStream inputStream;

private long streamByteOffset = 0;

public StreamChunker(int chunkSizeBytes, InputStream inputStream) {
this.inputStream = inputStream;
this.chunkSizeBytes = chunkSizeBytes;
}

/**
* Constructs a new DataChunk from just {@code chunkSizeBytes} bytes of the stream.
*
* <p>Returned chunk will be less than or equal to chunkSizeBytes, or absent if no bytes were
* remaining in the stream.
*/
public Optional<DataChunk> nextChunk() throws IOException {
byte[] chunkOfData = inputStream.readNBytes(chunkSizeBytes);
Optional<DataChunk> resp =
chunkOfData.length == 0
? Optional.empty()
: Optional.of(
DataChunk.builder()
.setChunk(chunkOfData)
.setStreamByteOffset(streamByteOffset)
.build());
streamByteOffset += chunkOfData.length;
return resp;
}
}
Loading

0 comments on commit 547a696

Please sign in to comment.