Skip to content

Commit

Permalink
remove metdata changes for asyn download flow
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed Apr 11, 2024
1 parent b528cfa commit 070cf91
Show file tree
Hide file tree
Showing 11 changed files with 21 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.common.annotation.ExperimentalApi;

import java.io.InputStream;
import java.util.Map;

/**
* Model composed of an input stream and the total content length of the stream
Expand All @@ -24,20 +23,17 @@ public class InputStreamContainer {
private final InputStream inputStream;
private final long contentLength;
private final long offset;
private final Map<String, String> metadata;

/**
* Construct a new stream object
*
* @param inputStream The input stream that is to be encapsulated
* @param contentLength The total content length that is to be read from the stream
* @param metadata The metadata of the blob. This will be same for each part download.
*/
public InputStreamContainer(InputStream inputStream, long contentLength, long offset, Map<String, String> metadata) {
public InputStreamContainer(InputStream inputStream, long contentLength, long offset) {
this.inputStream = inputStream;
this.contentLength = contentLength;
this.offset = offset;
this.metadata = metadata;
}

/**
Expand All @@ -60,11 +56,4 @@ public long getContentLength() {
public long getOffset() {
return offset;
}

/**
* @return metadata of the source content.
*/
public Map<String, String> getMetadata() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept
StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position, null);
return new InputStreamContainer(inputStream, size, position);
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
Expand Down Expand Up @@ -529,7 +529,7 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position, null);
return new InputStreamContainer(inputStream, size, position);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
Expand Down Expand Up @@ -651,7 +651,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position, null);
return new InputStreamContainer(inputStream, size, position);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void testWriteBlobByStreamsWithRetries() throws Exception {
StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position, null);
return new InputStreamContainer(inputStream, size, position);
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testOneChunkUpload() {
}, false, null, true, null),
new StreamContext((partIdx, partSize, position) -> {
streamRef.set(new ZeroInputStream(partSize));
return new InputStreamContainer(streamRef.get(), partSize, position, null);
return new InputStreamContainer(streamRef.get(), partSize, position);
}, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 1),
new StatsMetricPublisher()
);
Expand Down Expand Up @@ -129,7 +129,7 @@ public void testOneChunkUploadCorruption() {
// do nothing
}, false, null, true, null),
new StreamContext(
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position, null),
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position),
ByteSizeUnit.MB.toBytes(1),
ByteSizeUnit.MB.toBytes(1),
1
Expand Down Expand Up @@ -184,7 +184,7 @@ public void testMultipartUpload() {
new StreamContext((partIdx, partSize, position) -> {
InputStream stream = new ZeroInputStream(partSize);
streams.add(stream);
return new InputStreamContainer(stream, partSize, position, null);
return new InputStreamContainer(stream, partSize, position);
}, ByteSizeUnit.MB.toBytes(1), ByteSizeUnit.MB.toBytes(1), 5),
new StatsMetricPublisher()
);
Expand Down Expand Up @@ -242,7 +242,7 @@ public void testMultipartUploadCorruption() {
// do nothing
}, true, 0L, true, null),
new StreamContext(
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position, null),
(partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position),
ByteSizeUnit.MB.toBytes(1),
ByteSizeUnit.MB.toBytes(1),
5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
List<ReadContext.StreamPartCreator> blobPartStreams = new ArrayList<>();
for (int partNumber = 0; partNumber < numberOfParts; partNumber++) {
long offset = partNumber * partSize;
InputStreamContainer blobPartStream = new InputStreamContainer(
readBlob(blobName, offset, partSize),
partSize,
offset,
null
);
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream));
}
ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private InputStreamContainer decryptInputStreamContainer(InputStreamContainer in
long adjustedLength = decryptedStreamProvider.getAdjustedRange()[1] - adjustedPos + 1;
final InputStream decryptedStream = decryptedStreamProvider.getDecryptedStreamProvider()
.apply(inputStreamContainer.getInputStream());
return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos, null);
return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public long readBlobPreferredLength() {
private void executeWrite(InputStream inputStream, long blobSize, CheckedBiConsumer<InputStream, Long, IOException> writeConsumer)
throws IOException {
T cryptoContext = cryptoHandler.initEncryptionMetadata();
InputStreamContainer streamContainer = new InputStreamContainer(inputStream, blobSize, 0, null);
InputStreamContainer streamContainer = new InputStreamContainer(inputStream, blobSize, 0);
InputStreamContainer encryptedStream = cryptoHandler.createEncryptingStream(cryptoContext, streamContainer);
long cryptoLength = cryptoHandler.estimateEncryptedLengthOfEntireContent(cryptoContext, blobSize);
writeConsumer.accept(encryptedStream.getInputStream(), cryptoLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private LocalStreamSupplier<InputStreamContainer> getMultipartStreamSupplier(
inputStream = offsetRangeInputStream;
}

return new InputStreamContainer(inputStream, size, position, null);
return new InputStreamContainer(inputStream, size, position);
} catch (IOException e) {
log.error("Failed to create input stream", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testReadBlobAsync() throws Exception {
final byte[] data = new byte[size];
Randomness.get().nextBytes(data);

final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0, null);
final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0);
final ListenerTestUtils.CountingCompletionListener<ReadContext> completionListener =
new ListenerTestUtils.CountingCompletionListener<>();
final CompletableFuture<InputStreamContainer> streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer);
Expand Down Expand Up @@ -99,7 +99,7 @@ public void testReadBlobAsyncException() throws Exception {
// Objects needed for API call
final byte[] data = new byte[size];
Randomness.get().nextBytes(data);
final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0, null);
final InputStreamContainer inputStreamContainer = new InputStreamContainer(new ByteArrayInputStream(data), data.length, 0);
final ListenerTestUtils.CountingCompletionListener<ReadContext> completionListener =
new ListenerTestUtils.CountingCompletionListener<>();
final CompletableFuture<InputStreamContainer> streamContainerFuture = CompletableFuture.completedFuture(inputStreamContainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void testFilePartWriter() throws Exception {
Path segmentFilePath = path.resolve(UUID.randomUUID().toString());
int contentLength = 100;
InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength));
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), 0, null);
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), 0);

FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity());

Expand All @@ -45,7 +45,7 @@ public void testFilePartWriterWithOffset() throws Exception {
int contentLength = 100;
int offset = 10;
InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength));
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), offset, null);
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, inputStream.available(), offset);

FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity());

Expand All @@ -57,7 +57,7 @@ public void testFilePartWriterLargeInput() throws Exception {
Path segmentFilePath = path.resolve(UUID.randomUUID().toString());
int contentLength = 20 * 1024 * 1024;
InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(contentLength));
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, contentLength, 0, null);
InputStreamContainer inputStreamContainer = new InputStreamContainer(inputStream, contentLength, 0);

FilePartWriter.write(segmentFilePath, inputStreamContainer, UnaryOperator.identity());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public int available() {
blobPartStreams.add(
NUMBER_OF_PARTS,
() -> CompletableFuture.supplyAsync(
() -> new InputStreamContainer(badInputStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS, null),
() -> new InputStreamContainer(badInputStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS),
threadPool.generic()
)
);
Expand Down Expand Up @@ -174,7 +174,7 @@ public int read(byte[] b) throws IOException {
blobPartStreams.add(
NUMBER_OF_PARTS,
() -> CompletableFuture.supplyAsync(
() -> new InputStreamContainer(assertingStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS, null),
() -> new InputStreamContainer(assertingStream, PART_SIZE, PART_SIZE * NUMBER_OF_PARTS),
threadPool.generic()
)
);
Expand Down Expand Up @@ -219,7 +219,7 @@ private List<ReadContext.StreamPartCreator> initializeBlobPartStreams() {
int finalPartNumber = partNumber;
blobPartStreams.add(
() -> CompletableFuture.supplyAsync(
() -> new InputStreamContainer(testStream, PART_SIZE, (long) finalPartNumber * PART_SIZE, null),
() -> new InputStreamContainer(testStream, PART_SIZE, (long) finalPartNumber * PART_SIZE),
threadPool.generic()
)
);
Expand Down

0 comments on commit 070cf91

Please sign in to comment.