From b3a50d8482da5e0a89df669cfaa6a26a2a790272 Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat <2025sandeepkumawat@gmail.com> Date: Tue, 14 May 2024 21:01:08 +0530 Subject: [PATCH] add integration tests Signed-off-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com> --- .../RemoteStoreTranslogMetadataIT.java | 182 +++++++++++++++ .../RemoteStoreTranslogMetadataRestoreIT.java | 58 +++++ .../MockFsMetadataSupportedBlobContainer.java | 209 ++++++++++++++++++ .../MockFsMetadataSupportedBlobStore.java | 44 ++++ .../MockFsMetadataSupportedRepository.java | 51 +++++ ...ckFsMetadataSupportedRepositoryPlugin.java | 38 ++++ .../common/blobstore/FetchBlobResult.java | 10 +- .../transfer/TranslogTransferManager.java | 12 +- 8 files changed, 596 insertions(+), 8 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataRestoreIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobStore.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepositoryPlugin.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataIT.java new file mode 100644 index 0000000000000..a66fb64b3ea24 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataIT.java @@ -0,0 +1,182 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.translogmetadata; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.RemoteStoreIT; +import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreTranslogMetadataIT extends RemoteStoreIT { + + protected final String INDEX_NAME = "remote-store-test-idx-1"; + Path repositoryLocation; + boolean compress; + boolean overrideBuildRepositoryMetadata; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, MockFsMetadataSupportedRepositoryPlugin.class); + } + + @Before + public void setup() { + clusterSettingsSuppliedByTest = true; + overrideBuildRepositoryMetadata = false; + repositoryLocation = randomRepoPath(); + compress = randomBoolean(); + } + + @Override + public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) { + if (overrideBuildRepositoryMetadata) { + Map nodeAttributes = node.getAttributes(); + String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)); + + String settingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + name + ); + Map settingsMap = node.getAttributes() + .keySet() + .stream() + .filter(key -> key.startsWith(settingsAttributeKeyPrefix)) + .collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key))); + + Settings.Builder settings = Settings.builder(); + settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue())); + settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true); + + if (name.equals(REPOSITORY_NAME)) { + settings.put("location", repositoryLocation) + .put("compress", compress) + .put("max_remote_upload_bytes_per_sec", "1kb") + .put("chunk_size", 100, ByteSizeUnit.BYTES); + return new RepositoryMetadata(name, MockFsMetadataSupportedRepositoryPlugin.TYPE_MD, settings.build()); + } + return new RepositoryMetadata(name, type, settings.build()); + } else { + return super.buildRepositoryMetadata(node, name); + } + + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put( + remoteStoreClusterSettings( + REPOSITORY_NAME, + segmentRepoPath, + MockFsMetadataSupportedRepositoryPlugin.TYPE_MD, + REPOSITORY_2_NAME, + translogRepoPath, + MockFsMetadataSupportedRepositoryPlugin.TYPE_MD + ) + ) + .build(); + } + + // Test local only translog files which are not uploaded to remote store (no metadata present in remote) + // Without the cleanup change in RemoteFsTranslog.createEmptyTranslog, this test fails with NPE. + public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception { + clusterSettingsSuppliedByTest = true; + + // Overriding settings to use AsyncMultiStreamBlobContainer + Settings settings = Settings.builder() + .put(super.nodeSettings(1)) + .put( + remoteStoreClusterSettings( + REPOSITORY_NAME, + segmentRepoPath, + MockFsMetadataSupportedRepositoryPlugin.TYPE_MD, + REPOSITORY_2_NAME, + translogRepoPath, + MockFsMetadataSupportedRepositoryPlugin.TYPE_MD + ) + ) + .build(); + + internalCluster().startClusterManagerOnlyNode(settings); + String dataNode = internalCluster().startDataOnlyNode(settings); + + // 1. Create index with 0 replica + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + // 2. Index docs + int searchableDocs = 0; + for (int i = 0; i < randomIntBetween(1, 5); i++) { + indexBulk(INDEX_NAME, 15); + refresh(INDEX_NAME); + searchableDocs += 15; + } + indexBulk(INDEX_NAME, 15); + + assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs); + + // 3. Delete metadata from remote translog + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + + String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA).buildAsString(); + Path translogMetaDataPath = Path.of(translogRepoPath + "/" + shardPath); + + try (Stream files = Files.list(translogMetaDataPath)) { + files.forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + // Ignore + } + }); + } + + internalCluster().restartNode(dataNode); + + ensureGreen(INDEX_NAME); + + assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs); + indexBulk(INDEX_NAME, 15); + refresh(INDEX_NAME); + assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs + 15); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataRestoreIT.java new file mode 100644 index 0000000000000..29a7e1c73eea6 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataRestoreIT.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.translogmetadata; + +import org.opensearch.common.settings.Settings; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.RemoteStoreRestoreIT; +import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.junit.Before; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreTranslogMetadataRestoreIT extends RemoteStoreRestoreIT { + Path repositoryLocation; + boolean compress; + boolean overrideBuildRepositoryMetadata; + + @Before + public void setup() { + clusterSettingsSuppliedByTest = true; + overrideBuildRepositoryMetadata = false; + repositoryLocation = randomRepoPath(); + compress = randomBoolean(); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, MockFsMetadataSupportedRepositoryPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put( + remoteStoreClusterSettings( + REPOSITORY_NAME, + segmentRepoPath, + MockFsMetadataSupportedRepositoryPlugin.TYPE_MD, + REPOSITORY_2_NAME, + translogRepoPath, + MockFsMetadataSupportedRepositoryPlugin.TYPE_MD + ) + ) + .build(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java new file mode 100644 index 0000000000000..9762b7d1336cb --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java @@ -0,0 +1,209 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.translogmetadata.mocks; + +import org.apache.lucene.index.CorruptIndexException; +import org.opensearch.common.StreamContext; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.FetchBlobResult; +import org.opensearch.common.blobstore.fs.FsBlobContainer; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.blobstore.stream.read.ReadContext; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.io.InputStreamContainer; +import org.opensearch.core.action.ActionListener; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public class MockFsMetadataSupportedBlobContainer extends FsBlobContainer implements AsyncMultiStreamBlobContainer { + + private static final int TRANSFER_TIMEOUT_MILLIS = 30000; + private static String CHECKPOINT_FILE_DATA_KEY = "ckp-data"; + + private final boolean triggerDataIntegrityFailure; + + public MockFsMetadataSupportedBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, boolean triggerDataIntegrityFailure) { + super(blobStore, blobPath, path); + this.triggerDataIntegrityFailure = triggerDataIntegrityFailure; + } + + @Override + public void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException { + + int nParts = 10; + long partSize = writeContext.getFileSize() / nParts; + StreamContext streamContext = writeContext.getStreamProvider(partSize); + final Path file = path.resolve(writeContext.getFileName()); + byte[] buffer = new byte[(int) writeContext.getFileSize()]; + + // If the upload writeContext have a non-null metadata, we store the metadata content as translog file name. + if (writeContext.getMetadata() != null) { + String base64String = writeContext.getMetadata().get(CHECKPOINT_FILE_DATA_KEY); + byte[] decodedBytes = Base64.getDecoder().decode(base64String); + ByteArrayInputStream inputStream = new ByteArrayInputStream(decodedBytes); + int length = decodedBytes.length; + writeBlob(getCheckpointFileName(writeContext.getFileName()), inputStream, length, true); + } + + AtomicLong totalContentRead = new AtomicLong(); + CountDownLatch latch = new CountDownLatch(streamContext.getNumberOfParts()); + for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { + int finalPartIdx = partIdx; + Thread thread = new Thread(() -> { + try { + InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx); + InputStream inputStream = inputStreamContainer.getInputStream(); + long remainingContentLength = inputStreamContainer.getContentLength(); + long offset = partSize * finalPartIdx; + while (remainingContentLength > 0) { + int readContentLength = inputStream.read(buffer, (int) offset, (int) remainingContentLength); + totalContentRead.addAndGet(readContentLength); + remainingContentLength -= readContentLength; + offset += readContentLength; + } + inputStream.close(); + } catch (IOException e) { + completionListener.onFailure(e); + } finally { + latch.countDown(); + } + }); + thread.start(); + } + try { + if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName()); + } + } catch (InterruptedException e) { + throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName()); + } + try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { + outputStream.write(buffer); + } + if (writeContext.getFileSize() != totalContentRead.get()) { + throw new IOException( + "Incorrect content length read for file " + + writeContext.getFileName() + + ", actual file size: " + + writeContext.getFileSize() + + ", bytes read: " + + totalContentRead.get() + ); + } + + try { + // bulks need to succeed for segment files to be generated + if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) { + completionListener.onFailure( + new RuntimeException( + new CorruptIndexException( + "Data integrity check failure for file: " + writeContext.getFileName(), + writeContext.getFileName() + ) + ) + ); + } else { + writeContext.getUploadFinalizer().accept(true); + completionListener.onResponse(null); + } + } catch (Exception e) { + completionListener.onFailure(e); + } + + } + + // This is utility to get the translog.ckp file name for a given translog.tlog file. + private String getCheckpointFileName(String translogFileName) { + if (!translogFileName.endsWith(".tlog")) { + throw new IllegalArgumentException("Invalid translog file name format: " + translogFileName); + } + + int dotIndex = translogFileName.lastIndexOf('.'); + String baseName = translogFileName.substring(0, dotIndex); + return baseName + ".ckp"; + } + + public static String convertToBase64(InputStream inputStream) throws IOException { + try (inputStream) { + byte[] buffer = new byte[128]; + int bytesRead; + int totalBytesRead = 0; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + while ((bytesRead = inputStream.read(buffer)) != -1) { + byteArrayOutputStream.write(buffer, 0, bytesRead); + totalBytesRead += bytesRead; + if (totalBytesRead > 1024) { + // We enforce a limit of 1KB on the size of the checkpoint file. + throw new AssertionError("Input stream exceeds 1KB limit"); + } + } + + byte[] bytes = byteArrayOutputStream.toByteArray(); + return Base64.getEncoder().encodeToString(bytes); + } + } + + // during readBlobWithMetadata call we separately download translog.ckp file and return it as metadata. + @Override + public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException { + InputStream inputStream = readBlob(blobName); + InputStream ckpInputStream = readBlob(getCheckpointFileName(blobName)); + String ckpString = convertToBase64(ckpInputStream); + Map metadata = new HashMap<>(); + metadata.put(CHECKPOINT_FILE_DATA_KEY, ckpString); + return new FetchBlobResult(inputStream, metadata); + } + + @Override + public void readBlobAsync(String blobName, ActionListener listener) { + new Thread(() -> { + try { + long contentLength = listBlobs().get(blobName).length(); + long partSize = contentLength / 10; + int numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); + List blobPartStreams = new ArrayList<>(); + for (int partNumber = 0; partNumber < numberOfParts; partNumber++) { + long offset = partNumber * partSize; + InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset); + blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream)); + } + ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build(); + listener.onResponse(blobReadContext); + } catch (Exception e) { + listener.onFailure(e); + } + }).start(); + } + + public boolean remoteIntegrityCheckSupported() { + return true; + } + + private boolean isSegmentFile(String filename) { + return !filename.endsWith(".tlog") && !filename.endsWith(".ckp"); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobStore.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobStore.java new file mode 100644 index 0000000000000..89dd91c8222ac --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobStore.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.translogmetadata.mocks; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.fs.FsBlobStore; + +import java.io.IOException; +import java.nio.file.Path; + +public class MockFsMetadataSupportedBlobStore extends FsBlobStore { + + private final boolean triggerDataIntegrityFailure; + + public MockFsMetadataSupportedBlobStore(int bufferSizeInBytes, Path path, boolean readonly, boolean triggerDataIntegrityFailure) + throws IOException { + super(bufferSizeInBytes, path, readonly); + this.triggerDataIntegrityFailure = triggerDataIntegrityFailure; + } + + @Override + public BlobContainer blobContainer(BlobPath path) { + try { + return new MockFsMetadataSupportedBlobContainer(this, path, buildAndCreate(path), triggerDataIntegrityFailure); + } catch (IOException ex) { + throw new OpenSearchException("failed to create blob container", ex); + } + } + + // Make MockFs metadata supported + @Override + public boolean isBlobMetadataEnabled() { + return true; + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java new file mode 100644 index 0000000000000..333fba413ce4e --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.translogmetadata.mocks; + +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.settings.Setting; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.fs.FsRepository; + +public class MockFsMetadataSupportedRepository extends FsRepository { + + public static Setting TRIGGER_DATA_INTEGRITY_FAILURE = Setting.boolSetting( + "mock_fs_repository.trigger_data_integrity_failure", + false + ); + + private final boolean triggerDataIntegrityFailure; + + public MockFsMetadataSupportedRepository( + RepositoryMetadata metadata, + Environment environment, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); + triggerDataIntegrityFailure = TRIGGER_DATA_INTEGRITY_FAILURE.get(metadata.settings()); + } + + @Override + protected BlobStore createBlobStore() throws Exception { + FsBlobStore fsBlobStore = (FsBlobStore) super.createBlobStore(); + return new MockFsMetadataSupportedBlobStore( + fsBlobStore.bufferSizeInBytes(), + fsBlobStore.path(), + isReadOnly(), + triggerDataIntegrityFailure + ); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepositoryPlugin.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepositoryPlugin.java new file mode 100644 index 0000000000000..71ae652a6b23d --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepositoryPlugin.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.translogmetadata.mocks; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.RepositoryPlugin; +import org.opensearch.repositories.Repository; + +import java.util.Collections; +import java.util.Map; + +public class MockFsMetadataSupportedRepositoryPlugin extends Plugin implements RepositoryPlugin { + + public static final String TYPE_MD = "fs_metadata_supported_repository"; + + @Override + public Map getRepositories( + Environment env, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + return Collections.singletonMap( + "fs_metadata_supported_repository", + metadata -> new MockFsMetadataSupportedRepository(metadata, env, namedXContentRegistry, clusterService, recoverySettings) + ); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java b/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java index 55aca771b586c..4e8ea4c6b089f 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java +++ b/server/src/main/java/org/opensearch/common/blobstore/FetchBlobResult.java @@ -10,6 +10,8 @@ import org.opensearch.common.annotation.ExperimentalApi; +import java.io.Closeable; +import java.io.IOException; import java.io.InputStream; import java.util.Map; @@ -20,7 +22,7 @@ * @opensearch.experimental */ @ExperimentalApi -public class FetchBlobResult { +public class FetchBlobResult implements Closeable { /** * Downloaded blob InputStream @@ -45,4 +47,10 @@ public FetchBlobResult(InputStream inputStream, Map metadata) { this.metadata = metadata; } + @Override + public void close() throws IOException { + if (inputStream != null) { + inputStream.close(); + } + } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index f9c2df0ea2048..95899f338313e 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -276,13 +276,11 @@ private Map downloadTranslogFileAndGetMetadata(String fileName, long bytesToRead = 0, downloadStartTime = System.nanoTime(); Map metadata; - try { - FetchBlobResult inputStreamWithMetadata = transferService.downloadBlobWithMetadata( - remoteDataTransferPath.add(primaryTerm), - fileName - ); - InputStream inputStream = inputStreamWithMetadata.getInputStream(); - metadata = inputStreamWithMetadata.getMetadata(); + try ( + FetchBlobResult fetchBlobResult = transferService.downloadBlobWithMetadata(remoteDataTransferPath.add(primaryTerm), fileName) + ) { + InputStream inputStream = fetchBlobResult.getInputStream(); + metadata = fetchBlobResult.getMetadata(); bytesToRead = inputStream.available(); Files.copy(inputStream, filePath);