From 697613a934d846ab68cee9004a99edd3a2901bdb Mon Sep 17 00:00:00 2001 From: Raghuvansh Raj Date: Tue, 11 Jul 2023 18:06:01 +0530 Subject: [PATCH] [Remote Store] Add multipart upload integration for translog and segment files (#7119) Signed-off-by: Raghuvansh Raj --- .../common/io/InputStreamContainer.java | 11 +- .../RemoteStoreBaseIntegTestCase.java | 4 +- .../RemoteStoreMultipartFileCorruptionIT.java | 111 +++++++++ .../multipart/RemoteStoreMultipartIT.java | 38 +++ .../multipart/mocks/MockFsBlobStore.java | 36 +++ .../multipart/mocks/MockFsRepository.java | 46 ++++ .../mocks/MockFsRepositoryPlugin.java | 38 +++ .../mocks/MockFsVerifyingBlobContainer.java | 120 +++++++++ .../VerifyingMultiStreamBlobContainer.java | 34 +++ .../transfer/RemoteTransferContainer.java | 2 +- .../org/opensearch/common/util/ByteUtils.java | 10 + .../shard/RemoteStoreRefreshListener.java | 227 ++++++++---------- .../index/store/RemoteDirectory.java | 4 + .../store/RemoteSegmentStoreDirectory.java | 127 ++++++++++ .../ChecksumCombinationException.java | 2 +- .../opensearch/index/translog/Checkpoint.java | 2 +- .../TranslogCheckedContainer.java | 17 +- .../index/translog/TranslogHeader.java | 12 +- .../index/translog/TranslogReader.java | 37 ++- .../index/translog/TranslogWriter.java | 25 +- .../index/translog/checked/package-info.java | 10 - .../transfer/BlobStoreTransferService.java | 98 +++++++- .../index/translog/transfer/FileSnapshot.java | 17 +- .../translog/transfer/TransferService.java | 28 ++- .../TranslogCheckpointTransferSnapshot.java | 10 +- .../transfer/TranslogTransferManager.java | 15 +- .../org/opensearch/threadpool/ThreadPool.java | 2 +- .../RemoteTransferContainerTests.java | 41 ++++ .../RemoteSegmentStoreDirectoryTests.java | 86 +++++++ .../index/store/TestUploadListener.java | 43 ++++ ...oreTransferServiceMockRepositoryTests.java | 189 +++++++++++++++ .../BlobStoreTransferServiceTests.java | 22 +- .../translog/transfer/FileSnapshotTests.java | 6 +- .../transfer/FileTransferTrackerTests.java | 12 +- .../TranslogTransferManagerTests.java | 31 ++- 35 files changed, 1293 insertions(+), 220 deletions(-) rename {server => libs/common}/src/main/java/org/opensearch/common/io/InputStreamContainer.java (85%) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartFileCorruptionIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java create mode 100644 server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java rename server/src/main/java/org/opensearch/index/translog/{checked => }/TranslogCheckedContainer.java (74%) delete mode 100644 server/src/main/java/org/opensearch/index/translog/checked/package-info.java create mode 100644 server/src/test/java/org/opensearch/index/store/TestUploadListener.java create mode 100644 server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java diff --git a/server/src/main/java/org/opensearch/common/io/InputStreamContainer.java b/libs/common/src/main/java/org/opensearch/common/io/InputStreamContainer.java similarity index 85% rename from server/src/main/java/org/opensearch/common/io/InputStreamContainer.java rename to libs/common/src/main/java/org/opensearch/common/io/InputStreamContainer.java index ce5dcff9f5349..eb8a4e1382497 100644 --- a/server/src/main/java/org/opensearch/common/io/InputStreamContainer.java +++ b/libs/common/src/main/java/org/opensearch/common/io/InputStreamContainer.java @@ -19,6 +19,7 @@ public class InputStreamContainer { private final InputStream inputStream; private final long contentLength; + private final long offset; /** * Construct a new stream object @@ -26,9 +27,10 @@ public class InputStreamContainer { * @param inputStream The input stream that is to be encapsulated * @param contentLength The total content length that is to be read from the stream */ - public InputStreamContainer(InputStream inputStream, long contentLength) { + public InputStreamContainer(InputStream inputStream, long contentLength, long offset) { this.inputStream = inputStream; this.contentLength = contentLength; + this.offset = offset; } /** @@ -44,4 +46,11 @@ public InputStream getInputStream() { public long getContentLength() { return contentLength; } + + /** + * @return offset of the source content. + */ + public long getOffset() { + return offset; + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 2b3fcadfc645e..10f01749ab4c5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -102,9 +102,7 @@ protected void putRepository(Path path) { protected void setupRepo() { internalCluster().startClusterManagerOnlyNode(); absolutePath = randomRepoPath().toAbsolutePath(); - assertAcked( - clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) - ); + putRepository(absolutePath); } @After diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartFileCorruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartFileCorruptionIT.java new file mode 100644 index 0000000000000..8f375ca6e2b01 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartFileCorruptionIT.java @@ -0,0 +1,111 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.multipart.mocks.MockFsRepository; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.Collection; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public class RemoteStoreMultipartFileCorruptionIT extends OpenSearchIntegTestCase { + + protected static final String REPOSITORY_NAME = "test-remore-store-repo"; + private static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Collection> nodePlugins() { + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList()); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + Path absolutePath = randomRepoPath().toAbsolutePath(); + putRepository(absolutePath); + } + + protected void putRepository(Path path) { + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME) + .setType(MockFsRepositoryPlugin.TYPE) + .setSettings( + Settings.builder() + .put("location", path) + // custom setting for MockFsRepositoryPlugin + .put(MockFsRepository.TRIGGER_DATA_INTEGRITY_FAILURE.getKey(), true) + ) + ); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + protected Settings remoteStoreIndexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put("index.refresh_interval", "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + + private IndexResponse indexSingleDoc() { + return client().prepareIndex(INDEX_NAME) + .setId(UUIDs.randomBase64UUID()) + .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .get(); + } + + public void testLocalFileCorruptionDuringUpload() { + internalCluster().startDataOnlyNodes(1); + createIndex(INDEX_NAME, remoteStoreIndexSettings()); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + indexSingleDoc(); + + client().admin() + .indices() + .prepareRefresh(INDEX_NAME) + .setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_HIDDEN_FORBID_CLOSED) + .execute() + .actionGet(); + + // ensuring red cluster meaning shard has failed and is unassigned + ensureRed(INDEX_NAME); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java new file mode 100644 index 0000000000000..a523d5c0f5470 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/RemoteStoreMultipartIT.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart; + +import org.opensearch.common.settings.Settings; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.RemoteStoreIT; +import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; + +import java.nio.file.Path; +import java.util.Collection; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public class RemoteStoreMultipartIT extends RemoteStoreIT { + + @Override + protected Collection> nodePlugins() { + return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList()); + } + + @Override + protected void putRepository(Path path) { + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME) + .setType(MockFsRepositoryPlugin.TYPE) + .setSettings(Settings.builder().put("location", path)) + ); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java new file mode 100644 index 0000000000000..f1d9fbba84528 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsBlobStore.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.fs.FsBlobStore; + +import java.io.IOException; +import java.nio.file.Path; + +public class MockFsBlobStore extends FsBlobStore { + + private final boolean triggerDataIntegrityFailure; + + public MockFsBlobStore(int bufferSizeInBytes, Path path, boolean readonly, boolean triggerDataIntegrityFailure) throws IOException { + super(bufferSizeInBytes, path, readonly); + this.triggerDataIntegrityFailure = triggerDataIntegrityFailure; + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + try { + return new MockFsVerifyingBlobContainer(this, path, buildAndCreate(path), triggerDataIntegrityFailure); + } catch (IOException ex) { + throw new OpenSearchException("failed to create blob container", ex); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java new file mode 100644 index 0000000000000..15a9853477081 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepository.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.settings.Setting; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.fs.FsRepository; + +public class MockFsRepository extends FsRepository { + + public static Setting TRIGGER_DATA_INTEGRITY_FAILURE = Setting.boolSetting( + "mock_fs_repository.trigger_data_integrity_failure", + false + ); + + private final boolean triggerDataIntegrityFailure; + + public MockFsRepository( + RepositoryMetadata metadata, + Environment environment, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); + triggerDataIntegrityFailure = TRIGGER_DATA_INTEGRITY_FAILURE.get(metadata.settings()); + } + + @Override + protected BlobStore createBlobStore() throws Exception { + FsBlobStore fsBlobStore = (FsBlobStore) super.createBlobStore(); + return new MockFsBlobStore(fsBlobStore.bufferSizeInBytes(), fsBlobStore.path(), isReadOnly(), triggerDataIntegrityFailure); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java new file mode 100644 index 0000000000000..ffd53adf4e29e --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsRepositoryPlugin.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.RepositoryPlugin; +import org.opensearch.repositories.Repository; + +import java.util.Collections; +import java.util.Map; + +public class MockFsRepositoryPlugin extends Plugin implements RepositoryPlugin { + + public static final String TYPE = "fs_multipart_repository"; + + @Override + public Map getRepositories( + Environment env, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + return Collections.singletonMap( + "fs_multipart_repository", + metadata -> new MockFsRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) + ); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java new file mode 100644 index 0000000000000..8f2814eb7c4c4 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.multipart.mocks; + +import org.apache.lucene.index.CorruptIndexException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.io.InputStreamContainer; +import org.opensearch.common.StreamContext; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.fs.FsBlobContainer; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.blobstore.stream.write.WriteContext; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class MockFsVerifyingBlobContainer extends FsBlobContainer implements VerifyingMultiStreamBlobContainer { + + private static final int TRANSFER_TIMEOUT_MILLIS = 30000; + + private final boolean triggerDataIntegrityFailure; + + public MockFsVerifyingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, boolean triggerDataIntegrityFailure) { + super(blobStore, blobPath, path); + this.triggerDataIntegrityFailure = triggerDataIntegrityFailure; + } + + @Override + public void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException { + + int nParts = 10; + long partSize = writeContext.getFileSize() / nParts; + StreamContext streamContext = writeContext.getStreamProvider(partSize); + final Path file = path.resolve(writeContext.getFileName()); + byte[] buffer = new byte[(int) writeContext.getFileSize()]; + AtomicLong totalContentRead = new AtomicLong(); + CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts()); + for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { + int finalPartIdx = partIdx; + Thread thread = new Thread(() -> { + try { + InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx); + InputStream inputStream = inputStreamContainer.getInputStream(); + long remainingContentLength = inputStreamContainer.getContentLength(); + long offset = partSize * finalPartIdx; + while (remainingContentLength > 0) { + int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength); + totalContentRead.addAndGet(readContentLength); + remainingContentLength -= readContentLength; + offset += readContentLength; + } + inputStream.close(); + } catch (IOException e) { + completionListener.onFailure(e); + } finally { + latch.countDown(); + } + }); + thread.start(); + } + try { + if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName()); + } + } catch (InterruptedException e) { + throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName()); + } + try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { + outputStream.write(buffer); + } + if (writeContext.getFileSize() != totalContentRead.get()) { + throw new IOException( + "Incorrect content length read for file " + + writeContext.getFileName() + + ", actual file size: " + + writeContext.getFileSize() + + ", bytes read: " + + totalContentRead.get() + ); + } + + try { + // bulks need to succeed for segment files to be generated + if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) { + completionListener.onFailure( + new RuntimeException( + new CorruptIndexException( + "Data integrity check failure for file: " + writeContext.getFileName(), + writeContext.getFileName() + ) + ) + ); + } else { + writeContext.getUploadFinalizer().accept(true); + completionListener.onResponse(null); + } + } catch (Exception e) { + completionListener.onFailure(e); + } + + } + + private boolean isSegmentFile(String filename) { + return !filename.endsWith(".tlog") && !filename.endsWith(".ckp"); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java new file mode 100644 index 0000000000000..0dfcc5c50e4b1 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore; + +import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.stream.write.WriteContext; + +import java.io.IOException; + +/** + * An extension of {@link BlobContainer} that adds {@link VerifyingMultiStreamBlobContainer#asyncBlobUpload} to allow + * multipart uploads and performs integrity checks on transferred files + * + * @opensearch.internal + */ +public interface VerifyingMultiStreamBlobContainer extends BlobContainer { + + /** + * Reads blob content from multiple streams, each from a specific part of the file, which is provided by the + * StreamContextSupplier in the WriteContext passed to this method. An {@link IOException} is thrown if reading + * any of the input streams fails, or writing to the target blob fails + * + * @param writeContext A WriteContext object encapsulating all information needed to perform the upload + * @param completionListener Listener on which upload events should be published. + * @throws IOException if any of the input streams could not be read, or the target blob could not be written to + */ + void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index 7864c3ab5c794..ca744efae902d 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -160,7 +160,7 @@ private LocalStreamSupplier getMultipartStreamSupplier( : offsetRangeInputStream; Objects.requireNonNull(inputStreams.get())[streamIdx] = inputStream; - return new InputStreamContainer(inputStream, size); + return new InputStreamContainer(inputStream, size, position); } catch (IOException e) { log.error("Failed to create input stream", e); throw e; diff --git a/server/src/main/java/org/opensearch/common/util/ByteUtils.java b/server/src/main/java/org/opensearch/common/util/ByteUtils.java index 36ae3b1f5bcaa..8c7665d991751 100644 --- a/server/src/main/java/org/opensearch/common/util/ByteUtils.java +++ b/server/src/main/java/org/opensearch/common/util/ByteUtils.java @@ -61,6 +61,16 @@ public static void writeLongLE(long l, byte[] arr, int offset) { assert l == 0; } + /** Convert long to a byte array in big-endian format */ + public static byte[] toByteArrayBE(long l) { + byte[] result = new byte[8]; + for (int i = 7; i >= 0; i--) { + result[i] = (byte) (l & 0xffL); + l >>= 8; + } + return result; + } + /** Write a long in little-endian format. */ public static long readLongLE(byte[] arr, int offset) { long l = arr[offset++] & 0xFFL; diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 46d52bc8ca5df..e087bbb265727 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.ReferenceManager; @@ -18,11 +19,14 @@ import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; import org.opensearch.action.bulk.BackoffPolicy; -import org.opensearch.common.CheckedFunction; +import org.opensearch.action.support.GroupedActionListener; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.logging.Loggers; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.UploadListener; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; @@ -46,6 +50,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -110,7 +115,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final SegmentReplicationCheckpointPublisher checkpointPublisher; - private final FileUploader fileUploader; + private final UploadListener statsListener; public RemoteStoreRefreshListener( IndexShard indexShard, @@ -137,7 +142,7 @@ public RemoteStoreRefreshListener( this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); this.checkpointPublisher = checkpointPublisher; - this.fileUploader = new FileUploader(new UploadTracker() { + this.statsListener = new UploadListener() { @Override public void beforeUpload(String file) { // Start tracking the upload bytes started @@ -156,7 +161,7 @@ public void onFailure(String file) { // Track upload failure segmentTracker.addUploadBytesFailed(latestFileNameSizeOnLocalMap.get(file)); } - }, remoteDirectory, storeDirectory, this::getChecksumOfLocalFile, logger); + }; } @Override @@ -190,7 +195,7 @@ private synchronized void syncSegments(boolean isRetry) { long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs(); long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); long bytesBeforeUpload = segmentTracker.getUploadBytesSucceeded(), startTimeInNS = System.nanoTime(); - boolean shouldRetry = true; + try { if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { @@ -242,18 +247,51 @@ private synchronized void syncSegments(boolean isRetry) { // Create a map of file name to size and update the refresh segment tracker updateLocalSizeMapAndTracker(localSegmentsPostRefresh); + CountDownLatch latch = new CountDownLatch(1); + ActionListener segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(Void unused) { + boolean shouldRetry = true; + try { + // Start metadata file upload + uploadMetadata(localSegmentsPostRefresh, segmentInfos); + clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); + onSuccessfulSegmentsSync( + refreshTimeMs, + refreshClockTimeMs, + refreshSeqNo, + lastRefreshedCheckpoint, + checkpoint + ); + // At this point since we have uploaded new segments, segment infos and segment metadata file, + // along with marking minSeqNoToKeep, upload has succeeded completely. + shouldRetry = false; + } catch (Exception e) { + // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried + // in the next refresh. This should not affect durability of the indexed data after remote trans-log + // integration. + logger.warn("Exception in post new segment upload actions", e); + } finally { + doComplete(shouldRetry); + } + } + + @Override + public void onFailure(Exception e) { + logger.warn("Exception while uploading new segments to the remote segment store", e); + doComplete(true); + } + + private void doComplete(boolean shouldRetry) { + // Update the segment tracker with the final upload status as seen at the end + updateFinalUploadStatusInSegmentTracker(shouldRetry == false, bytesBeforeUpload, startTimeInNS); + afterSegmentsSync(isRetry, shouldRetry); + } + }, latch); // Start the segments files upload - boolean newSegmentsUploadStatus = uploadNewSegments(localSegmentsPostRefresh); - if (newSegmentsUploadStatus) { - // Start metadata file upload - uploadMetadata(localSegmentsPostRefresh, segmentInfos); - clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); - onSuccessfulSegmentsSync(refreshTimeMs, refreshClockTimeMs, refreshSeqNo, lastRefreshedCheckpoint, checkpoint); - // At this point since we have uploaded new segments, segment infos and segment metadata file, - // along with marking minSeqNoToKeep, upload has succeeded completely. - shouldRetry = false; - } + uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener); + latch.await(); } } catch (EngineException e) { logger.warn("Exception while reading SegmentInfosSnapshot", e); @@ -265,11 +303,7 @@ private synchronized void syncSegments(boolean isRetry) { } } catch (Throwable t) { logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); - } finally { - // Update the segment tracker with the final upload status as seen at the end - updateFinalUploadStatusInSegmentTracker(shouldRetry == false, bytesBeforeUpload, startTimeInNS); } - afterSegmentsSync(isRetry, shouldRetry); } /** @@ -378,17 +412,50 @@ void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos se } } - private boolean uploadNewSegments(Collection localSegmentsPostRefresh) throws IOException { - AtomicBoolean uploadSuccess = new AtomicBoolean(true); - localSegmentsPostRefresh.forEach(file -> { - try { - fileUploader.uploadFile(file); - } catch (IOException e) { - uploadSuccess.set(false); - logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); - } - }); - return uploadSuccess.get(); + private void uploadNewSegments(Collection localSegmentsPostRefresh, ActionListener listener) { + Collection filteredFiles = localSegmentsPostRefresh.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList()); + if (filteredFiles.size() == 0) { + listener.onResponse(null); + return; + } + + ActionListener> mappedListener = ActionListener.map(listener, resp -> null); + GroupedActionListener batchUploadListener = new GroupedActionListener<>(mappedListener, filteredFiles.size()); + + for (String src : filteredFiles) { + ActionListener aggregatedListener = ActionListener.wrap(resp -> { + statsListener.onSuccess(src); + batchUploadListener.onResponse(resp); + }, ex -> { + logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", ex), ex); + if (ex instanceof CorruptIndexException) { + indexShard.failShard(ex.getMessage(), ex); + } + statsListener.onFailure(src); + batchUploadListener.onFailure(ex); + }); + statsListener.beforeUpload(src); + remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener); + } + } + + /** + * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. + * + * @param file that needs to be uploaded. + * @return true if the upload has to be skipped for the file. + */ + private boolean skipUpload(String file) { + try { + // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. + return EXCLUDE_FILES.contains(file) || remoteDirectory.containsFile(file, getChecksumOfLocalFile(file)); + } catch (IOException e) { + logger.error( + "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", + file + ); + } + return false; } private String getChecksumOfLocalFile(String file) throws IOException { @@ -462,104 +529,4 @@ private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long segmentTracker.incrementTotalUploadsFailed(); } } - - /** - * This class is a wrapper over the copying of file from local to remote store allowing to decorate the actual copy - * method along with adding hooks of code that can be run before, on success and on failure. - * - * @opensearch.internal - */ - private static class FileUploader { - - private final Logger logger; - - private final UploadTracker uploadTracker; - - private final RemoteSegmentStoreDirectory remoteDirectory; - - private final Directory storeDirectory; - - private final CheckedFunction checksumProvider; - - public FileUploader( - UploadTracker uploadTracker, - RemoteSegmentStoreDirectory remoteDirectory, - Directory storeDirectory, - CheckedFunction checksumProvider, - Logger logger - ) { - this.uploadTracker = uploadTracker; - this.remoteDirectory = remoteDirectory; - this.storeDirectory = storeDirectory; - this.checksumProvider = checksumProvider; - this.logger = logger; - } - - /** - * Calling this method will lead to before getting executed and then the actual upload. Based on the upload status, - * the onSuccess or onFailure method gets invoked. - * - * @param file the file which is to be uploaded. - * @throws IOException is thrown if the upload fails. - */ - private void uploadFile(String file) throws IOException { - if (skipUpload(file)) { - return; - } - uploadTracker.beforeUpload(file); - boolean success = false; - try { - performUpload(file); - uploadTracker.onSuccess(file); - success = true; - } finally { - if (!success) { - uploadTracker.onFailure(file); - } - } - } - - /** - * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. - * - * @param file that needs to be uploaded. - * @return true if the upload has to be skipped for the file. - */ - private boolean skipUpload(String file) { - try { - // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded. - return EXCLUDE_FILES.contains(file) || remoteDirectory.containsFile(file, checksumProvider.apply(file)); - } catch (IOException e) { - logger.error( - "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", - file - ); - } - return false; - } - - /** - * This method does the actual upload. - * - * @param file that needs to be uploaded. - * @throws IOException is thrown if the upload fails. - */ - private void performUpload(String file) throws IOException { - remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); - } - } - - /** - * A tracker class that is fed to FileUploader. - * - * @opensearch.internal - */ - interface UploadTracker { - - void beforeUpload(String file); - - void onSuccess(String file); - - void onFailure(String file); - } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 8782808c070ab..f7fe7ca62e6ba 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -45,6 +45,10 @@ public class RemoteDirectory extends Directory { protected final BlobContainer blobContainer; + public BlobContainer getBlobContainer() { + return blobContainer; + } + public RemoteDirectory(BlobContainer blobContainer) { this.blobContainer = blobContainer; } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index e7602203440d2..395ecba442e86 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -8,9 +8,12 @@ package org.opensearch.index.store; +import com.jcraft.jzlib.JZlib; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; @@ -19,10 +22,20 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionListener; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.exception.CorruptFileException; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.common.util.ByteUtils; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.store.exception.ChecksumCombinationException; import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; @@ -44,6 +57,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import java.util.zip.CRC32; /** * A RemoteDirectory extension for remote segment store. We need to make sure we don't overwrite a segment file once uploaded. @@ -62,6 +76,11 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement */ public static final String SEGMENT_NAME_UUID_SEPARATOR = "__"; + /** + * Number of bytes in the segment file to store checksum + */ + private static final int SEGMENT_CHECKSUM_BYTES = 8; + /** * remoteDataDirectory is used to store segment files at path: cluster_UUID/index_UUID/shardId/segments/data */ @@ -349,6 +368,89 @@ public IndexInput openInput(String name, IOContext context) throws IOException { } } + /** + * Copies a file from the source directory to a remote based on multi-stream upload support. + * If vendor plugin supports uploading multiple parts in parallel, BlobContainer#writeBlobByStreams + * will be used, else, the legacy {@link RemoteSegmentStoreDirectory#copyFrom(Directory, String, String, IOContext)} + * will be called. + * + * @param from The directory for the file to be uploaded + * @param src File to be uploaded + * @param context IOContext to be used to open IndexInput of file during remote upload + * @param listener Listener to handle upload callback events + */ + public void copyFrom(Directory from, String src, IOContext context, ActionListener listener) { + if (remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer) { + try { + String remoteFilename = getNewRemoteSegmentFilename(src); + uploadBlob(from, src, remoteFilename, context, listener); + } catch (Exception e) { + listener.onFailure(e); + } + } else { + try { + copyFrom(from, src, src, context); + listener.onResponse(null); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", src), e); + listener.onFailure(e); + } + } + } + + private void uploadBlob(Directory from, String src, String remoteFileName, IOContext ioContext, ActionListener listener) + throws Exception { + long expectedChecksum = calculateChecksumOfChecksum(from, src); + long contentLength; + try (IndexInput indexInput = from.openInput(src, ioContext)) { + contentLength = indexInput.length(); + } + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + src, + remoteFileName, + contentLength, + true, + WritePriority.NORMAL, + (size, position) -> new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position), + expectedChecksum, + remoteDataDirectory.getBlobContainer() instanceof VerifyingMultiStreamBlobContainer + ); + ActionListener completionListener = ActionListener.wrap(resp -> { + try { + postUpload(from, src, remoteFileName, getChecksumOfLocalFile(from, src)); + listener.onResponse(null); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception in segment postUpload for file [{}]", src), e); + listener.onFailure(e); + } + }, ex -> { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", src), ex); + IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(ex); + if (corruptIndexException != null) { + listener.onFailure(corruptIndexException); + return; + } + Throwable throwable = ExceptionsHelper.unwrap(ex, CorruptFileException.class); + if (throwable != null) { + CorruptFileException corruptFileException = (CorruptFileException) throwable; + listener.onFailure(new CorruptIndexException(corruptFileException.getMessage(), corruptFileException.getFileName())); + return; + } + listener.onFailure(ex); + }); + + completionListener = ActionListener.runBefore(completionListener, () -> { + try { + remoteTransferContainer.close(); + } catch (Exception e) { + logger.warn("Error occurred while closing streams", e); + } + }); + + WriteContext writeContext = remoteTransferContainer.createWriteContext(); + ((VerifyingMultiStreamBlobContainer) remoteDataDirectory.getBlobContainer()).asyncBlobUpload(writeContext, completionListener); + } + /** * This acquires a lock on a given commit by creating a lock file in lock directory using {@code FileLockInfo} * @param primaryTerm Primary Term of index at the time of commit. @@ -425,6 +527,10 @@ public void copyFrom(Directory from, String src, String dest, IOContext context, String remoteFilename; remoteFilename = getNewRemoteSegmentFilename(dest); remoteDataDirectory.copyFrom(from, src, remoteFilename, context); + postUpload(from, src, remoteFilename, checksum); + } + + private void postUpload(Directory from, String src, String remoteFilename, String checksum) throws IOException { UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src)); segmentsUploadedToRemoteStore.put(src, segmentMetadata); } @@ -528,6 +634,27 @@ private String getChecksumOfLocalFile(Directory directory, String file) throws I } } + private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException { + try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) { + long storedChecksum = CodecUtil.retrieveChecksum(indexInput); + CRC32 checksumOfChecksum = new CRC32(); + checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); + try { + return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), SEGMENT_CHECKSUM_BYTES); + } catch (Exception e) { + throw new ChecksumCombinationException( + "Potentially corrupted file: Checksum combination failed while combining stored checksum " + + "and calculated checksum of stored checksum in segment file: " + + file + + ", directory: " + + directory, + file, + e + ); + } + } + } + private String getExistingRemoteFilename(String localFilename) { if (segmentsUploadedToRemoteStore.containsKey(localFilename)) { return segmentsUploadedToRemoteStore.get(localFilename).uploadedFilename; diff --git a/server/src/main/java/org/opensearch/index/store/exception/ChecksumCombinationException.java b/server/src/main/java/org/opensearch/index/store/exception/ChecksumCombinationException.java index a355473aa2afd..d8e1739fbaa9d 100644 --- a/server/src/main/java/org/opensearch/index/store/exception/ChecksumCombinationException.java +++ b/server/src/main/java/org/opensearch/index/store/exception/ChecksumCombinationException.java @@ -11,7 +11,7 @@ import org.apache.lucene.index.CorruptIndexException; /** - * Exception is raised when combination to two crc checksums fail. + * Exception is raised when combination of two CRC checksums fail. * * @opensearch.internal */ diff --git a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java index 56de7e5daf55f..a9f905f52bc3a 100644 --- a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java +++ b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java @@ -233,7 +233,7 @@ public static void write(FileChannel fileChannel, Path checkpointFile, Checkpoin } } - private static byte[] createCheckpointBytes(Path checkpointFile, Checkpoint checkpoint) throws IOException { + public static byte[] createCheckpointBytes(Path checkpointFile, Checkpoint checkpoint) throws IOException { final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(V4_FILE_SIZE) { @Override public synchronized byte[] toByteArray() { diff --git a/server/src/main/java/org/opensearch/index/translog/checked/TranslogCheckedContainer.java b/server/src/main/java/org/opensearch/index/translog/TranslogCheckedContainer.java similarity index 74% rename from server/src/main/java/org/opensearch/index/translog/checked/TranslogCheckedContainer.java rename to server/src/main/java/org/opensearch/index/translog/TranslogCheckedContainer.java index b90794e29d2b1..7e2a38559166f 100644 --- a/server/src/main/java/org/opensearch/index/translog/checked/TranslogCheckedContainer.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogCheckedContainer.java @@ -6,13 +6,10 @@ * compatible open source license. */ -package org.opensearch.index.translog.checked; +package org.opensearch.index.translog; -import org.opensearch.common.io.Channels; import org.opensearch.common.util.concurrent.ReleasableLock; -import java.io.IOException; -import java.nio.channels.FileChannel; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.zip.CRC32; @@ -28,21 +25,15 @@ public class TranslogCheckedContainer { private final Checksum checksum; private final AtomicLong contentLength; private final ReleasableLock updateLock = new ReleasableLock(new ReentrantLock()); - private final String file; /** - * Creates TranslogCheckedContainer from provided channel. + * Create TranslogCheckedContainer from provided bytes * - * @param channel {@link FileChannel} to read from - * @param offset offset of channel from which bytes are to be read. - * @param len Length of bytes to be read. + * @param bytes The byte array to read from */ - public TranslogCheckedContainer(FileChannel channel, int offset, int len, String file) throws IOException { + public TranslogCheckedContainer(byte[] bytes) { this.checksum = new CRC32(); this.contentLength = new AtomicLong(); - this.file = file; - - byte[] bytes = Channels.readFromFileChannel(channel, offset, len); updateFromBytes(bytes, 0, bytes.length); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java index af6ebcf7b7c66..8067cccb772a2 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java @@ -46,6 +46,7 @@ import java.io.EOFException; import java.io.IOException; +import java.io.OutputStream; import java.nio.channels.FileChannel; import java.nio.file.Path; @@ -213,12 +214,10 @@ private static void tryReportOldVersionError(final Path path, final FileChannel /** * Writes this header with the latest format into the file channel */ - void write(final FileChannel channel, boolean fsync) throws IOException { + void write(final OutputStream outputStream) throws IOException { // This output is intentionally not closed because closing it will close the FileChannel. @SuppressWarnings({ "IOResourceOpenedButNotSafelyClosed", "resource" }) - final BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput( - new OutputStreamStreamOutput(java.nio.channels.Channels.newOutputStream(channel)) - ); + final BufferedChecksumStreamOutput out = new BufferedChecksumStreamOutput(new OutputStreamStreamOutput(outputStream)); CodecUtil.writeHeader(new OutputStreamDataOutput(out), TRANSLOG_CODEC, CURRENT_VERSION); // Write uuid final BytesRef uuid = new BytesRef(translogUUID); @@ -229,6 +228,11 @@ void write(final FileChannel channel, boolean fsync) throws IOException { // Checksum header out.writeInt((int) out.getChecksum()); out.flush(); + } + + void write(final FileChannel channel, boolean fsync) throws IOException { + OutputStream outputStream = java.nio.channels.Channels.newOutputStream(channel); + write(outputStream); if (fsync == true) { channel.force(true); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java index c4a4fb7a460a0..9ea3328587645 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java @@ -33,6 +33,7 @@ package org.opensearch.index.translog; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.common.Nullable; import org.opensearch.common.io.Channels; import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.seqno.SequenceNumbers; @@ -59,6 +60,11 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { private final Checkpoint checkpoint; protected final AtomicBoolean closed = new AtomicBoolean(false); + @Nullable + private final Long translogChecksum; + @Nullable + private final Long checkpointChecksum; + /** * Create a translog writer against the specified translog file channel. * @@ -67,11 +73,34 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { * @param path the path to the translog * @param header the header of the translog file */ - TranslogReader(final Checkpoint checkpoint, final FileChannel channel, final Path path, final TranslogHeader header) { + TranslogReader( + final Checkpoint checkpoint, + final FileChannel channel, + final Path path, + final TranslogHeader header, + final Long translogChecksum + ) throws IOException { super(checkpoint.generation, channel, path, header); this.length = checkpoint.offset; this.totalOperations = checkpoint.numOps; this.checkpoint = checkpoint; + this.translogChecksum = translogChecksum; + this.checkpointChecksum = (translogChecksum != null) ? calculateCheckpointChecksum(checkpoint, path) : null; + } + + private static Long calculateCheckpointChecksum(Checkpoint checkpoint, Path path) throws IOException { + TranslogCheckedContainer checkpointCheckedContainer = new TranslogCheckedContainer( + Checkpoint.createCheckpointBytes(path.getParent().resolve(Translog.CHECKPOINT_FILE_NAME), checkpoint) + ); + return checkpointCheckedContainer.getChecksum(); + } + + public Long getTranslogChecksum() { + return translogChecksum; + } + + public Long getCheckpointChecksum() { + return checkpointChecksum; } /** @@ -87,7 +116,7 @@ public class TranslogReader extends BaseTranslogReader implements Closeable { public static TranslogReader open(final FileChannel channel, final Path path, final Checkpoint checkpoint, final String translogUUID) throws IOException { final TranslogHeader header = TranslogHeader.read(translogUUID, path, channel); - return new TranslogReader(checkpoint, channel, path, header); + return new TranslogReader(checkpoint, channel, path, header, null); } /** @@ -115,9 +144,9 @@ TranslogReader closeIntoTrimmedReader(long aboveSeqNo, ChannelFactory channelFac IOUtils.fsync(checkpointFile.getParent(), true); - newReader = new TranslogReader(newCheckpoint, channel, path, header); + newReader = new TranslogReader(newCheckpoint, channel, path, header, translogChecksum); } else { - newReader = new TranslogReader(checkpoint, channel, path, header); + newReader = new TranslogReader(checkpoint, channel, path, header, translogChecksum); } toCloseOnFailure = null; return newReader; diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java index e19aece60adc0..e7b08b1dda3d2 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java @@ -37,6 +37,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; +import org.opensearch.common.Nullable; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; @@ -54,6 +55,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; @@ -110,6 +112,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private final Map> seenSequenceNumbers; + @Nullable + private final TranslogCheckedContainer translogCheckedContainer; + private final Boolean remoteTranslogEnabled; private TranslogWriter( @@ -126,6 +131,7 @@ private TranslogWriter( final TragicExceptionHolder tragedy, final LongConsumer persistedSequenceNumberConsumer, final BigArrays bigArrays, + TranslogCheckedContainer translogCheckedContainer, Boolean remoteTranslogEnabled ) throws IOException { super(initialCheckpoint.generation, channel, path, header); @@ -151,6 +157,7 @@ private TranslogWriter( this.bigArrays = bigArrays; this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap<>() : null; this.tragedy = tragedy; + this.translogCheckedContainer = translogCheckedContainer; this.remoteTranslogEnabled = remoteTranslogEnabled; } @@ -179,6 +186,12 @@ public static TranslogWriter create( checkpointChannel = channelFactory.open(checkpointFile, StandardOpenOption.WRITE); final TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm); header.write(channel, !Boolean.TRUE.equals(remoteTranslogEnabled)); + TranslogCheckedContainer translogCheckedContainer = null; + if (Boolean.TRUE.equals(remoteTranslogEnabled)) { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + header.write(byteArrayOutputStream); + translogCheckedContainer = new TranslogCheckedContainer(byteArrayOutputStream.toByteArray()); + } final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint( header.sizeInBytes(), fileGeneration, @@ -214,6 +227,7 @@ public static TranslogWriter create( tragedy, persistedSequenceNumberConsumer, bigArrays, + translogCheckedContainer, remoteTranslogEnabled ); } catch (Exception exception) { @@ -438,7 +452,13 @@ public TranslogReader closeIntoReader() throws IOException { closeWithTragicEvent(ex); throw ex; } - return new TranslogReader(getLastSyncedCheckpoint(), channel, path, header); + return new TranslogReader( + getLastSyncedCheckpoint(), + channel, + path, + header, + (translogCheckedContainer != null) ? translogCheckedContainer.getChecksum() : null + ); } else { throw new AlreadyClosedException( "translog [" + getGeneration() + "] is already closed (path [" + path + "]", @@ -571,6 +591,9 @@ private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOExcep while (currentBytesConsumed != current.length) { int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining()); ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite); + if (translogCheckedContainer != null) { + translogCheckedContainer.updateFromBytes(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite); + } currentBytesConsumed += nBytesToWrite; if (ioBuffer.hasRemaining() == false) { ioBuffer.flip(); diff --git a/server/src/main/java/org/opensearch/index/translog/checked/package-info.java b/server/src/main/java/org/opensearch/index/translog/checked/package-info.java deleted file mode 100644 index ddb235fdbedce..0000000000000 --- a/server/src/main/java/org/opensearch/index/translog/checked/package-info.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** Contains checksum related utilities for translog files */ -package org.opensearch.index.translog.checked; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index d9feb1a832681..974e8af42b939 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -16,12 +16,22 @@ import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream; +import org.opensearch.index.translog.ChannelFactory; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Set; import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC; @@ -44,18 +54,18 @@ public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) { } @Override - public void uploadBlobAsync( - String threadpoolName, + public void uploadBlob( + String threadPoolName, final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath, - ActionListener listener + ActionListener listener, + WritePriority writePriority ) { assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; - threadPool.executor(threadpoolName).execute(ActionRunnable.wrap(listener, l -> { - try (InputStream inputStream = fileSnapshot.inputStream()) { - blobStore.blobContainer(blobPath) - .writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); + threadPool.executor(threadPoolName).execute(ActionRunnable.wrap(listener, l -> { + try { + uploadBlob(fileSnapshot, blobPath, writePriority); l.onResponse(fileSnapshot); } catch (Exception e) { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); @@ -65,14 +75,84 @@ public void uploadBlobAsync( } @Override - public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { - assert remoteTransferPath instanceof BlobPath; + public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath, WritePriority writePriority) + throws IOException { BlobPath blobPath = (BlobPath) remoteTransferPath; try (InputStream inputStream = fileSnapshot.inputStream()) { blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); } } + @Override + public void uploadBlobs( + Set fileSnapshots, + final Map blobPaths, + ActionListener listener, + WritePriority writePriority + ) { + fileSnapshots.forEach(fileSnapshot -> { + BlobPath blobPath = blobPaths.get(fileSnapshot.getPrimaryTerm()); + if (!(blobStore.blobContainer(blobPath) instanceof VerifyingMultiStreamBlobContainer)) { + uploadBlob(ThreadPool.Names.TRANSLOG_TRANSFER, fileSnapshot, blobPath, listener, writePriority); + } else { + uploadBlob(fileSnapshot, listener, blobPath, writePriority); + } + }); + + } + + private void uploadBlob( + TransferFileSnapshot fileSnapshot, + ActionListener listener, + BlobPath blobPath, + WritePriority writePriority + ) { + + try { + ChannelFactory channelFactory = FileChannel::open; + long contentLength; + try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) { + contentLength = channel.size(); + } + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + fileSnapshot.getName(), + fileSnapshot.getName(), + contentLength, + true, + writePriority, + (size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position), + Objects.requireNonNull(fileSnapshot.getChecksum()), + blobStore.blobContainer(blobPath) instanceof VerifyingMultiStreamBlobContainer + ); + ActionListener completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); + listener.onFailure(new FileTransferException(fileSnapshot, ex)); + }); + + completionListener = ActionListener.runBefore(completionListener, () -> { + try { + remoteTransferContainer.close(); + } catch (Exception e) { + logger.warn("Error occurred while closing streams", e); + } + }); + + WriteContext writeContext = remoteTransferContainer.createWriteContext(); + ((VerifyingMultiStreamBlobContainer) blobStore.blobContainer(blobPath)).asyncBlobUpload(writeContext, completionListener); + + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); + listener.onFailure(new FileTransferException(fileSnapshot, e)); + } finally { + try { + fileSnapshot.close(); + } catch (IOException e) { + logger.warn("Error while closing TransferFileSnapshot", e); + } + } + + } + @Override public InputStream downloadBlob(Iterable path, String fileName) throws IOException { return blobStore.blobContainer((BlobPath) path).readBlob(fileName); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index 239ef7c3c9300..dcec94edd694f 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -107,10 +107,12 @@ public void close() throws IOException { public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; + private Long checksum; - public TransferFileSnapshot(Path path, long primaryTerm) throws IOException { + public TransferFileSnapshot(Path path, long primaryTerm, Long checksum) throws IOException { super(path); this.primaryTerm = primaryTerm; + this.checksum = checksum; } public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException { @@ -118,6 +120,10 @@ public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throw this.primaryTerm = primaryTerm; } + public Long getChecksum() { + return checksum; + } + public long getPrimaryTerm() { return primaryTerm; } @@ -148,8 +154,8 @@ public static final class TranslogFileSnapshot extends TransferFileSnapshot { private final long generation; - public TranslogFileSnapshot(long primaryTerm, long generation, Path path) throws IOException { - super(path, primaryTerm); + public TranslogFileSnapshot(long primaryTerm, long generation, Path path, Long checksum) throws IOException { + super(path, primaryTerm, checksum); this.generation = generation; } @@ -185,8 +191,9 @@ public static final class CheckpointFileSnapshot extends TransferFileSnapshot { private final long minTranslogGeneration; - public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path) throws IOException { - super(path, primaryTerm); + public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path, Long checksum) + throws IOException { + super(path, primaryTerm, checksum); this.minTranslogGeneration = minTranslogGeneration; this.generation = generation; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java index 0e6496042e3d8..a240fd38cda11 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -10,11 +10,14 @@ import org.opensearch.action.ActionListener; import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -26,25 +29,40 @@ public interface TransferService { /** * Uploads the {@link TransferFileSnapshot} async, once the upload is complete the callback is invoked - * @param threadpoolName threadpool type which will be used to upload blobs asynchronously + * @param threadPoolName threadpool type which will be used to upload blobs asynchronously * @param fileSnapshot the file snapshot to upload * @param remotePath the remote path where upload should be made * @param listener the callback to be invoked once upload completes successfully/fails */ - void uploadBlobAsync( - String threadpoolName, + void uploadBlob( + String threadPoolName, final TransferFileSnapshot fileSnapshot, Iterable remotePath, - ActionListener listener + ActionListener listener, + WritePriority writePriority ); + /** + * Uploads multiple {@link TransferFileSnapshot}, once the upload is complete the callback is invoked + * @param fileSnapshots the file snapshots to upload + * @param blobPaths Primary term to {@link BlobPath} map + * @param listener the callback to be invoked once uploads complete successfully/fail + */ + void uploadBlobs( + Set fileSnapshots, + final Map blobPaths, + ActionListener listener, + WritePriority writePriority + ) throws Exception; + /** * Uploads the {@link TransferFileSnapshot} blob * @param fileSnapshot the file snapshot to upload * @param remotePath the remote path where upload should be made + * @param writePriority Priority by which content needs to be written. * @throws IOException the exception while transferring the data */ - void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; + void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath, WritePriority writePriority) throws IOException; void deleteBlobs(Iterable path, List fileNames) throws IOException; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index b34c2282e874f..10dec13c81e1a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -145,8 +145,14 @@ public TranslogCheckpointTransferSnapshot build() throws IOException { Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration)); generations.add(readerGeneration); translogTransferSnapshot.add( - new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath), - new CheckpointFileSnapshot(readerPrimaryTerm, checkpointGeneration, minTranslogGeneration, checkpointPath) + new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath, reader.getTranslogChecksum()), + new CheckpointFileSnapshot( + readerPrimaryTerm, + checkpointGeneration, + minTranslogGeneration, + checkpointPath, + reader.getCheckpointChecksum() + ) ); if (readerGeneration > highestGeneration) { highestGeneration = readerGeneration; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 54140226e3744..0c63a7ffe4cce 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -17,6 +17,7 @@ import org.opensearch.common.SetOnce; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -119,14 +120,16 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans }), latch ); + Map blobPathMap = new HashMap<>(); toUpload.forEach( - fileSnapshot -> transferService.uploadBlobAsync( - ThreadPool.Names.TRANSLOG_TRANSFER, - fileSnapshot, - remoteDataTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), - latchedActionListener + fileSnapshot -> blobPathMap.put( + fileSnapshot.getPrimaryTerm(), + remoteDataTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())) ) ); + + transferService.uploadBlobs(toUpload, blobPathMap, latchedActionListener, WritePriority.HIGH); + try { if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) { Exception ex = new TimeoutException("Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"); @@ -139,7 +142,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans throw ex; } if (exceptionList.isEmpty()) { - transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadataTransferPath); + transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadataTransferPath, WritePriority.HIGH); translogTransferListener.onUploadComplete(transferSnapshot); return true; } else { diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index ebc68c288e25a..d9f73a9b41658 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -46,10 +46,10 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.OpenSearchExecutors; -import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.concurrent.XRejectedExecutionHandler; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.node.Node; diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java index 1ebec46042247..48940a0d401fd 100644 --- a/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java +++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java @@ -9,6 +9,7 @@ package org.opensearch.common.blobstore.transfer; import org.junit.Before; +import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.stream.write.WritePriority; @@ -21,6 +22,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.UUID; public class RemoteTransferContainerTests extends OpenSearchTestCase { @@ -140,6 +142,45 @@ public void testTypeOfProvidedStreamsAllCases() throws IOException { testTypeOfProvidedStreams(false); } + public void testCreateWriteContextAllCases() throws IOException { + testCreateWriteContext(true); + testCreateWriteContext(false); + } + + private void testCreateWriteContext(boolean doRemoteDataIntegrityCheck) throws IOException { + String remoteFileName = testFile.getFileName().toString() + UUID.randomUUID(); + Long expectedChecksum = randomLong(); + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + testFile.getFileName().toString(), + remoteFileName, + TEST_FILE_SIZE_BYTES, + true, + WritePriority.HIGH, + new RemoteTransferContainer.OffsetRangeInputStreamSupplier() { + @Override + public OffsetRangeInputStream get(long size, long position) throws IOException { + return new OffsetRangeFileInputStream(testFile, size, position); + } + }, + expectedChecksum, + doRemoteDataIntegrityCheck + ) + ) { + WriteContext writeContext = remoteTransferContainer.createWriteContext(); + assertEquals(remoteFileName, writeContext.getFileName()); + assertTrue(writeContext.isFailIfAlreadyExists()); + assertEquals(TEST_FILE_SIZE_BYTES, writeContext.getFileSize()); + assertEquals(WritePriority.HIGH, writeContext.getWritePriority()); + assertEquals(doRemoteDataIntegrityCheck, writeContext.doRemoteDataIntegrityCheck()); + if (doRemoteDataIntegrityCheck) { + assertEquals(expectedChecksum, writeContext.getExpectedChecksum()); + } else { + assertNull(writeContext.getExpectedChecksum()); + } + } + } + private void testTypeOfProvidedStreams(boolean isRemoteDataIntegritySupported) throws IOException { try ( RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index c37893877253e..ea092fffa3a9a 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -23,8 +23,12 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.junit.After; import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -48,6 +52,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.HashMap; +import java.util.Collection; import java.util.concurrent.ExecutorService; import static org.mockito.Mockito.mock; @@ -68,6 +76,7 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private RemoteStoreMetadataLockManager mdLockManager; private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; + private TestUploadListener testUploadTracker; private IndexShard indexShard; private SegmentInfos segmentInfos; private ThreadPool threadPool; @@ -89,6 +98,7 @@ public void setup() throws IOException { mdLockManager, threadPool ); + testUploadTracker = new TestUploadListener(); Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build(); ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); @@ -503,6 +513,82 @@ public void testCopyFrom() throws IOException { storeDirectory.close(); } + public void testCopyFilesFromMultipart() throws Exception { + String filename = "_100.si"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + Mockito.doAnswer(invocation -> { + ActionListener completionListener = invocation.getArgument(1); + completionListener.onResponse(null); + return null; + }).when(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + + CountDownLatch latch = new CountDownLatch(1); + ActionListener completionListener = new ActionListener() { + @Override + public void onResponse(Void unused) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) {} + }; + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener); + assertTrue(latch.await(5000, TimeUnit.SECONDS)); + assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + storeDirectory.close(); + } + + public void testCopyFilesFromMultipartIOException() throws Exception { + String filename = "_100.si"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); + when(remoteDataDirectory.getBlobContainer()).thenReturn(blobContainer); + Mockito.doAnswer(invocation -> { + ActionListener completionListener = invocation.getArgument(1); + completionListener.onFailure(new Exception("Test exception")); + return null; + }).when(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + CountDownLatch latch = new CountDownLatch(1); + ActionListener completionListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) { + latch.countDown(); + } + }; + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener); + assertTrue(latch.await(5000, TimeUnit.SECONDS)); + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + storeDirectory.close(); + } + public void testCopyFromException() throws IOException { String filename = "_100.si"; Directory storeDirectory = LuceneTestCase.newDirectory(); diff --git a/server/src/test/java/org/opensearch/index/store/TestUploadListener.java b/server/src/test/java/org/opensearch/index/store/TestUploadListener.java new file mode 100644 index 0000000000000..a2a61a93371e8 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/TestUploadListener.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.opensearch.common.util.UploadListener; + +import java.util.concurrent.ConcurrentHashMap; + +public class TestUploadListener implements UploadListener { + + private final ConcurrentHashMap uploadStatusMap = new ConcurrentHashMap<>(); + + enum UploadStatus { + BEFORE_UPLOAD, + UPLOAD_SUCCESS, + UPLOAD_FAILURE + } + + @Override + public void beforeUpload(String file) { + uploadStatusMap.put(file, UploadStatus.BEFORE_UPLOAD); + } + + @Override + public void onSuccess(String file) { + uploadStatusMap.put(file, UploadStatus.UPLOAD_SUCCESS); + } + + @Override + public void onFailure(String file) { + uploadStatusMap.put(file, UploadStatus.UPLOAD_FAILURE); + } + + public UploadStatus getUploadStatus(String file) { + return uploadStatusMap.get(file); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java new file mode 100644 index 0000000000000..1175716679d0f --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceMockRepositoryTests.java @@ -0,0 +1,189 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class BlobStoreTransferServiceMockRepositoryTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + + private BlobStore blobStore; + + @Override + public void setUp() throws Exception { + super.setUp(); + blobStore = mock(BlobStore.class); + threadPool = new TestThreadPool(getClass().getName()); + } + + public void testUploadBlobs() throws Exception { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + testFile, + randomNonNegativeLong(), + 0L + ); + + VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); + Mockito.doAnswer(invocation -> { + ActionListener completionListener = invocation.getArgument(1); + completionListener.onResponse(null); + return null; + }).when(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + + TransferService transferService = new BlobStoreTransferService(blobStore, threadPool); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + AtomicReference fileSnapshotRef = new AtomicReference<>(); + transferService.uploadBlobs(Collections.singleton(transferFileSnapshot), new HashMap<>() { + { + put(transferFileSnapshot.getPrimaryTerm(), new BlobPath().add("sample_path")); + } + }, new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + onResponseCalled.set(true); + fileSnapshotRef.set(fileSnapshot); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }, latch), WritePriority.HIGH); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + verify(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + assertTrue(onResponseCalled.get()); + assertEquals(transferFileSnapshot.getPrimaryTerm(), fileSnapshotRef.get().getPrimaryTerm()); + assertEquals(transferFileSnapshot.getName(), fileSnapshotRef.get().getName()); + assertNull(exceptionRef.get()); + } + + public void testUploadBlobsIOException() throws Exception { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + testFile, + randomNonNegativeLong(), + 0L + ); + + VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); + doThrow(new IOException()).when(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + + TransferService transferService = new BlobStoreTransferService(blobStore, threadPool); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + transferService.uploadBlobs(Collections.singleton(transferFileSnapshot), new HashMap<>() { + { + put(transferFileSnapshot.getPrimaryTerm(), new BlobPath().add("sample_path")); + } + }, new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + onResponseCalled.set(true); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }, latch), WritePriority.HIGH); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + verify(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + assertFalse(onResponseCalled.get()); + assertTrue(exceptionRef.get() instanceof FileTransferException); + } + + public void testUploadBlobsUploadFutureCompletedExceptionally() throws Exception { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + testFile, + randomNonNegativeLong(), + 0L + ); + + VerifyingMultiStreamBlobContainer blobContainer = mock(VerifyingMultiStreamBlobContainer.class); + Mockito.doAnswer(invocation -> { + ActionListener completionListener = invocation.getArgument(1); + completionListener.onFailure(new Exception("Test exception")); + return null; + }).when(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + + when(blobStore.blobContainer(any(BlobPath.class))).thenReturn(blobContainer); + + TransferService transferService = new BlobStoreTransferService(blobStore, threadPool); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + LatchedActionListener listener = new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + onResponseCalled.set(true); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + } + }, latch); + transferService.uploadBlobs(Collections.singleton(transferFileSnapshot), new HashMap<>() { + { + put(transferFileSnapshot.getPrimaryTerm(), new BlobPath().add("sample_path")); + } + }, listener, WritePriority.HIGH); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + verify(blobContainer).asyncBlobUpload(any(WriteContext.class), any()); + assertFalse(onResponseCalled.get()); + assertTrue(exceptionRef.get() instanceof FileTransferException); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index 196fbd58c2c20..5502dc3089c62 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -12,6 +12,7 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.env.Environment; @@ -49,9 +50,13 @@ public void setUp() throws Exception { public void testUploadBlob() throws IOException { Path testFile = createTempFile(); Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); - FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + testFile, + randomNonNegativeLong(), + null + ); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); - transferService.uploadBlob(transferFileSnapshot, repository.basePath()); + transferService.uploadBlob(transferFileSnapshot, repository.basePath(), WritePriority.HIGH); } public void testUploadBlobFromByteArray() throws IOException { @@ -61,17 +66,21 @@ public void testUploadBlobFromByteArray() throws IOException { 1 ); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); - transferService.uploadBlob(transferFileSnapshot, repository.basePath()); + transferService.uploadBlob(transferFileSnapshot, repository.basePath(), WritePriority.NORMAL); } public void testUploadBlobAsync() throws IOException, InterruptedException { Path testFile = createTempFile(); Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); AtomicBoolean succeeded = new AtomicBoolean(false); - FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( + testFile, + randomNonNegativeLong(), + null + ); CountDownLatch latch = new CountDownLatch(1); TransferService transferService = new BlobStoreTransferService(repository.blobStore(), threadPool); - transferService.uploadBlobAsync( + transferService.uploadBlob( ThreadPool.Names.TRANSLOG_TRANSFER, transferFileSnapshot, repository.basePath(), @@ -87,7 +96,8 @@ public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { public void onFailure(Exception e) { throw new AssertionError("Failed to perform uploadBlobAsync", e); } - }, latch) + }, latch), + WritePriority.HIGH ); assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); assertTrue(succeeded.get()); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java index 6d2fb3794b107..8d07af5927135 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/FileSnapshotTests.java @@ -28,15 +28,15 @@ public void tearDown() throws Exception { public void testFileSnapshotPath() throws IOException { Path file = createTempFile(); Files.writeString(file, "hello"); - fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12); + fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null); assertFileSnapshotProperties(file); - try (FileSnapshot sameFileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12)) { + try (FileSnapshot sameFileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12, null)) { assertEquals(sameFileSnapshot, fileSnapshot); } - try (FileSnapshot sameFileDiffPTSnapshot = new FileSnapshot.TransferFileSnapshot(file, 34)) { + try (FileSnapshot sameFileDiffPTSnapshot = new FileSnapshot.TransferFileSnapshot(file, 34, null)) { assertNotEquals(sameFileDiffPTSnapshot, fileSnapshot); } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java index be14e4a7bd380..fd0d44564ef6b 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/FileTransferTrackerTests.java @@ -34,7 +34,8 @@ public void testOnSuccess() throws IOException { try ( FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( testFile, - randomNonNegativeLong() + randomNonNegativeLong(), + null ) ) { fileTransferTracker.onSuccess(transferFileSnapshot); @@ -58,11 +59,13 @@ public void testOnFailure() throws IOException { try ( FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( testFile, - randomNonNegativeLong() + randomNonNegativeLong(), + null ); FileSnapshot.TransferFileSnapshot transferFileSnapshot2 = new FileSnapshot.TransferFileSnapshot( testFile2, - randomNonNegativeLong() + randomNonNegativeLong(), + null ) ) { @@ -82,7 +85,8 @@ public void testUploaded() throws IOException { try ( FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot( testFile, - randomNonNegativeLong() + randomNonNegativeLong(), + null ); ) { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 5f8aa64457896..66cd257299e25 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -17,6 +17,7 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -41,6 +42,8 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -80,20 +83,24 @@ public void tearDown() throws Exception { } @SuppressWarnings("unchecked") - public void testTransferSnapshot() throws IOException { + public void testTransferSnapshot() throws Exception { AtomicInteger fileTransferSucceeded = new AtomicInteger(); AtomicInteger fileTransferFailed = new AtomicInteger(); AtomicInteger translogTransferSucceeded = new AtomicInteger(); AtomicInteger translogTransferFailed = new AtomicInteger(); doNothing().when(transferService) - .uploadBlob(any(TransferFileSnapshot.class), Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm)))); + .uploadBlob( + any(TransferFileSnapshot.class), + Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm))), + any(WritePriority.class) + ); doAnswer(invocationOnMock -> { - ActionListener listener = (ActionListener) invocationOnMock.getArguments()[3]; - listener.onResponse((TransferFileSnapshot) invocationOnMock.getArguments()[1]); + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + Set transferFileSnapshots = (Set) invocationOnMock.getArguments()[0]; + transferFileSnapshots.forEach(listener::onResponse); return null; - }).when(transferService) - .uploadBlobAsync(any(String.class), any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); FileTransferTracker fileTransferTracker = new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { @Override @@ -145,13 +152,15 @@ public Set getCheckpointFileSnapshots() { primaryTerm, generation, minTranslogGeneration, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX) + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX), + null ), new CheckpointFileSnapshot( primaryTerm, generation, minTranslogGeneration, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX) + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX), + null ) ); } catch (IOException e) { @@ -166,12 +175,14 @@ public Set getTranslogFileSnapshots() { new TranslogFileSnapshot( primaryTerm, generation, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX) + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX), + null ), new TranslogFileSnapshot( primaryTerm, generation - 1, - createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX) + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX), + null ) ); } catch (IOException e) {