Skip to content

Commit

Permalink
Upload blob from input stream (opensearch-project#13836)
Browse files Browse the repository at this point in the history
* Blobstore transfer of cluster metadata from the underlying input stream

Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha authored and wangdongyu.danny committed Aug 22, 2024
1 parent 0396e14 commit a2a715a
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.gateway.remote.model;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.remote.RemoteWriteableEntity;
Expand Down Expand Up @@ -54,15 +55,20 @@ public void writeAsync(final U entity, final ActionListener<Void> listener) {
try (InputStream inputStream = entity.serialize()) {
BlobPath blobPath = getBlobPathForUpload(entity);
entity.setFullBlobName(blobPath);
// TODO uncomment below logic after merging PR https://github.com/opensearch-project/OpenSearch/pull/13836
// transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT,
// listener);
transferService.uploadBlob(
inputStream,
getBlobPathForUpload(entity),
entity.getBlobFileName(),
WritePriority.URGENT,
listener
);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public T read(final U entity) throws IOException {
// TODO Add timing logs and tracing
assert entity.getFullBlobName() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
Expand All @@ -19,11 +20,13 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.InputStreamWithMetadata;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.store.exception.ChecksumCombinationException;
import org.opensearch.index.translog.ChannelFactory;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -41,6 +44,7 @@
import java.util.Set;

import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC;
import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum;
import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY;

/**
Expand All @@ -53,6 +57,7 @@ public class BlobStoreTransferService implements TransferService {
private final BlobStore blobStore;
private final ThreadPool threadPool;

private static final int CHECKSUM_BYTES_LENGTH = 8;
private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class);

public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) {
Expand Down Expand Up @@ -108,6 +113,40 @@ public void uploadBlobs(

}

@Override
public void uploadBlob(
InputStream inputStream,
Iterable<String> remotePath,
String fileName,
WritePriority writePriority,
ActionListener<Void> listener
) throws IOException {
assert remotePath instanceof BlobPath;
BlobPath blobPath = (BlobPath) remotePath;
final BlobContainer blobContainer = blobStore.blobContainer(blobPath);
if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) {
blobContainer.writeBlob(fileName, inputStream, inputStream.available(), false);
listener.onResponse(null);
return;
}
final String resourceDescription = "BlobStoreTransferService.uploadBlob(blob=\"" + fileName + "\")";
byte[] bytes = inputStream.readAllBytes();
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) {
long expectedChecksum = computeChecksum(input, resourceDescription);
uploadBlobAsyncInternal(
fileName,
fileName,
bytes.length,
blobPath,
writePriority,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
listener,
null
);
}
}

// Builds a metadata map containing the Base64-encoded checkpoint file data associated with a translog file.
static Map<String, String> buildTransferFileMetadata(InputStream metadataInputStream) throws IOException {
Map<String, String> metadata = new HashMap<>();
Expand Down Expand Up @@ -150,37 +189,23 @@ private void uploadBlob(
try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) {
contentLength = channel.size();
}
boolean remoteIntegrityEnabled = false;
BlobContainer blobContainer = blobStore.blobContainer(blobPath);
if (blobContainer instanceof AsyncMultiStreamBlobContainer) {
remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported();
}
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
ActionListener<Void> completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex);
listener.onFailure(new FileTransferException(fileSnapshot, ex));
});

Objects.requireNonNull(fileSnapshot.getChecksum());
uploadBlobAsyncInternal(
fileSnapshot.getName(),
fileSnapshot.getName(),
contentLength,
true,
blobPath,
writePriority,
(size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position),
Objects.requireNonNull(fileSnapshot.getChecksum()),
remoteIntegrityEnabled,
fileSnapshot.getChecksum(),
completionListener,
metadata
);
ActionListener<Void> completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex);
listener.onFailure(new FileTransferException(fileSnapshot, ex));
});

completionListener = ActionListener.runBefore(completionListener, () -> {
try {
remoteTransferContainer.close();
} catch (Exception e) {
logger.warn("Error occurred while closing streams", e);
}
});

WriteContext writeContext = remoteTransferContainer.createWriteContext();
((AsyncMultiStreamBlobContainer) blobStore.blobContainer(blobPath)).asyncBlobUpload(writeContext, completionListener);

} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e);
Expand All @@ -195,6 +220,40 @@ private void uploadBlob(

}

private void uploadBlobAsyncInternal(
String fileName,
String remoteFileName,
long contentLength,
BlobPath blobPath,
WritePriority writePriority,
RemoteTransferContainer.OffsetRangeInputStreamSupplier inputStreamSupplier,
long expectedChecksum,
ActionListener<Void> completionListener,
Map<String, String> metadata
) throws IOException {
BlobContainer blobContainer = blobStore.blobContainer(blobPath);
assert blobContainer instanceof AsyncMultiStreamBlobContainer;
boolean remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported();
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileName,
remoteFileName,
contentLength,
true,
writePriority,
inputStreamSupplier,
expectedChecksum,
remoteIntegrityEnabled,
metadata
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(
remoteTransferContainer.createWriteContext(),
completionListener
);
}
}

@Override
public InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException {
return blobStore.blobContainer((BlobPath) path).readBlob(fileName);
Expand Down Expand Up @@ -276,4 +335,19 @@ public void listAllInSortedOrderAsync(
threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, filenamePrefix, limit, listener); });
}

private static long computeChecksum(IndexInput indexInput, String resourceDescription) throws ChecksumCombinationException {
long expectedChecksum;
try {
expectedChecksum = checksumOfChecksum(indexInput.clone(), CHECKSUM_BYTES_LENGTH);
} catch (Exception e) {
throw new ChecksumCombinationException(
"Potentially corrupted file: Checksum combination failed while combining stored checksum "
+ "and calculated checksum of stored checksum",
resourceDescription,
e
);
}
return expectedChecksum;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ void uploadBlobs(
*/
void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remotePath, WritePriority writePriority) throws IOException;

/**
* Reads the input stream and uploads as a blob
* @param inputStream the stream to read from
* @param remotePath the remote path where upload should be made
* @param blobName the name of blob file
* @param writePriority Priority by which content needs to be written.
* @param listener the callback to be invoked once uploads complete successfully/fail
* @throws IOException the exception thrown while uploading
*/
void uploadBlob(
InputStream inputStream,
Iterable<String> remotePath,
String blobName,
WritePriority writePriority,
ActionListener<Void> listener
) throws IOException;

void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException;

/**
Expand Down
Loading

0 comments on commit a2a715a

Please sign in to comment.