-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add multipart upload integration for translog and segment files
Signed-off-by: Raghuvansh Raj <[email protected]>
- Loading branch information
1 parent
72ed13b
commit 33f3a69
Showing
19 changed files
with
928 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
43 changes: 43 additions & 0 deletions
43
...internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.remotestore.multipart; | ||
|
||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.remotestore.RemoteStoreIT; | ||
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; | ||
|
||
import java.nio.file.Path; | ||
import java.util.Collection; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
|
||
public class RemoteStoreMultipartIT extends RemoteStoreIT { | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList()); | ||
} | ||
|
||
@Override | ||
protected void putRepository(Path path) { | ||
assertAcked( | ||
clusterAdmin().preparePutRepository(REPOSITORY_NAME) | ||
.setType(MockFsRepositoryPlugin.TYPE) | ||
.setSettings(Settings.builder().put("location", path)) | ||
); | ||
} | ||
|
||
@Override | ||
protected void deleteRepository() { | ||
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); | ||
} | ||
} |
103 changes: 103 additions & 0 deletions
103
...ernalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobContainer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.remotestore.multipart.mocks; | ||
|
||
import org.opensearch.common.Stream; | ||
import org.opensearch.common.StreamProvider; | ||
import org.opensearch.common.blobstore.BlobPath; | ||
import org.opensearch.common.blobstore.fs.FsBlobContainer; | ||
import org.opensearch.common.blobstore.fs.FsBlobStore; | ||
import org.opensearch.common.blobstore.stream.StreamContext; | ||
import org.opensearch.common.blobstore.stream.write.UploadResponse; | ||
import org.opensearch.common.blobstore.stream.write.WriteContext; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.StandardOpenOption; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
public class MockFsBlobContainer extends FsBlobContainer { | ||
|
||
private static final int TRANSFER_TIMEOUT_MILLIS = 30000; | ||
|
||
public MockFsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) { | ||
super(blobStore, blobPath, path); | ||
} | ||
|
||
@Override | ||
public boolean isMultiStreamUploadSupported() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public CompletableFuture<UploadResponse> writeBlobByStreams(WriteContext writeContext) throws IOException { | ||
CompletableFuture<UploadResponse> completableFuture = new CompletableFuture<>(); | ||
|
||
int nParts = 10; | ||
long partSize = writeContext.getFileSize() / nParts; | ||
StreamContext streamContext = writeContext.getStreamContext(partSize); | ||
StreamProvider streamProvider = streamContext.getStreamProvider(); | ||
final Path file = path.resolve(writeContext.getFileName()); | ||
byte[] buffer = new byte[(int) writeContext.getFileSize()]; | ||
AtomicLong totalContentRead = new AtomicLong(); | ||
CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts()); | ||
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { | ||
int finalPartIdx = partIdx; | ||
Thread thread = new Thread(() -> { | ||
try { | ||
Stream stream = streamProvider.provideStream(finalPartIdx); | ||
InputStream inputStream = stream.getInputStream(); | ||
long remainingContentLength = stream.getContentLength(); | ||
long offset = stream.getOffset(); | ||
while (remainingContentLength > 0) { | ||
int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength); | ||
totalContentRead.addAndGet(readContentLength); | ||
remainingContentLength -= readContentLength; | ||
offset += readContentLength; | ||
} | ||
inputStream.close(); | ||
} catch (IOException e) { | ||
completableFuture.completeExceptionally(e); | ||
} finally { | ||
latch.countDown(); | ||
} | ||
}); | ||
thread.start(); | ||
} | ||
try { | ||
if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { | ||
throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName()); | ||
} | ||
} catch (InterruptedException e) { | ||
throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName()); | ||
} | ||
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { | ||
outputStream.write(buffer); | ||
} | ||
if (writeContext.getFileSize() != totalContentRead.get()) { | ||
throw new IOException( | ||
"Incorrect content length read for file " | ||
+ writeContext.getFileName() | ||
+ ", actual file size: " | ||
+ writeContext.getFileSize() | ||
+ ", bytes read: " | ||
+ totalContentRead.get() | ||
); | ||
} | ||
completableFuture.complete(new UploadResponse(true)); | ||
|
||
return completableFuture; | ||
} | ||
} |
33 changes: 33 additions & 0 deletions
33
.../internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.remotestore.multipart.mocks; | ||
|
||
import org.opensearch.OpenSearchException; | ||
import org.opensearch.common.blobstore.BlobContainer; | ||
import org.opensearch.common.blobstore.BlobPath; | ||
import org.opensearch.common.blobstore.fs.FsBlobStore; | ||
|
||
import java.io.IOException; | ||
import java.nio.file.Path; | ||
|
||
public class MockFsBlobStore extends FsBlobStore { | ||
|
||
public MockFsBlobStore(int bufferSizeInBytes, Path path, boolean readonly) throws IOException { | ||
super(bufferSizeInBytes, path, readonly); | ||
} | ||
|
||
@Override | ||
public BlobContainer blobContainer(BlobPath path) { | ||
try { | ||
return new MockFsBlobContainer(this, path, buildAndCreate(path)); | ||
} catch (IOException ex) { | ||
throw new OpenSearchException("failed to create blob container", ex); | ||
} | ||
} | ||
} |
37 changes: 37 additions & 0 deletions
37
...internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.remotestore.multipart.mocks; | ||
|
||
import org.opensearch.cluster.metadata.RepositoryMetadata; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.blobstore.BlobStore; | ||
import org.opensearch.common.blobstore.fs.FsBlobStore; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.env.Environment; | ||
import org.opensearch.indices.recovery.RecoverySettings; | ||
import org.opensearch.repositories.fs.FsRepository; | ||
|
||
public class MockFsRepository extends FsRepository { | ||
|
||
public MockFsRepository( | ||
RepositoryMetadata metadata, | ||
Environment environment, | ||
NamedXContentRegistry namedXContentRegistry, | ||
ClusterService clusterService, | ||
RecoverySettings recoverySettings | ||
) { | ||
super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); | ||
} | ||
|
||
@Override | ||
protected BlobStore createBlobStore() throws Exception { | ||
FsBlobStore fsBlobStore = (FsBlobStore) super.createBlobStore(); | ||
return new MockFsBlobStore(fsBlobStore.bufferSizeInBytes(), fsBlobStore.path(), isReadOnly()); | ||
} | ||
} |
38 changes: 38 additions & 0 deletions
38
...alClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.remotestore.multipart.mocks; | ||
|
||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.env.Environment; | ||
import org.opensearch.indices.recovery.RecoverySettings; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.plugins.RepositoryPlugin; | ||
import org.opensearch.repositories.Repository; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
|
||
public class MockFsRepositoryPlugin extends Plugin implements RepositoryPlugin { | ||
|
||
public static final String TYPE = "fs_multipart_repository"; | ||
|
||
@Override | ||
public Map<String, Repository.Factory> getRepositories( | ||
Environment env, | ||
NamedXContentRegistry namedXContentRegistry, | ||
ClusterService clusterService, | ||
RecoverySettings recoverySettings | ||
) { | ||
return Collections.singletonMap( | ||
"fs_multipart_repository", | ||
metadata -> new MockFsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) | ||
); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
server/src/main/java/org/opensearch/common/blobstore/stream/write/UploadResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.blobstore.stream.write; | ||
|
||
/** | ||
* Response object for uploads using <code>BlobContainer#writeBlobByStreams</code> | ||
*/ | ||
public class UploadResponse { | ||
|
||
private final boolean uploadSuccessful; | ||
|
||
/** | ||
* Construct a new UploadResponse object | ||
* | ||
* @param uploadSuccessful Whether the current upload was successful or not | ||
*/ | ||
public UploadResponse(boolean uploadSuccessful) { | ||
this.uploadSuccessful = uploadSuccessful; | ||
} | ||
|
||
/** | ||
* @return The upload success result | ||
*/ | ||
public boolean isUploadSuccessful() { | ||
return uploadSuccessful; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.