From 9f089e7b429f7f7758ea544507c4132873ec1cab Mon Sep 17 00:00:00 2001 From: Sandeep Kumawat <2025sandeepkumawat@gmail.com> Date: Wed, 15 May 2024 22:32:06 +0530 Subject: [PATCH] enhance tests and cleanup Signed-off-by: Sandeep Kumawat <2025sandeepkumawat@gmail.com> --- ...ava => TranslogMetadataRemoteStoreIT.java} | 60 +---------------- ...TranslogMetadataRemoteStoreRestoreIT.java} | 15 +---- .../TranslogMetadataRemoteStoreStatsIT.java | 46 +++++++++++++ .../MockFsMetadataSupportedBlobContainer.java | 11 +-- .../transfer/BlobStoreTransferService.java | 3 +- .../transfer/TranslogTransferManager.java | 67 ++++++++----------- .../test/OpenSearchIntegTestCase.java | 1 + 7 files changed, 84 insertions(+), 119 deletions(-) rename server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/{RemoteStoreTranslogMetadataIT.java => TranslogMetadataRemoteStoreIT.java} (62%) rename server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/{RemoteStoreTranslogMetadataRestoreIT.java => TranslogMetadataRemoteStoreRestoreIT.java} (78%) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/TranslogMetadataRemoteStoreStatsIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/TranslogMetadataRemoteStoreIT.java similarity index 62% rename from server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataIT.java rename to server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/TranslogMetadataRemoteStoreIT.java index a66fb64b3ea24..20dd1f2a121d4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/TranslogMetadataRemoteStoreIT.java @@ -9,91 +9,33 @@ package org.opensearch.remotestore.translogmetadata; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; -import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.plugins.Plugin; import org.opensearch.remotestore.RemoteStoreIT; import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin; -import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; -import org.junit.Before; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; -import java.util.Locale; -import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteStoreTranslogMetadataIT extends RemoteStoreIT { - - protected final String INDEX_NAME = "remote-store-test-idx-1"; - Path repositoryLocation; - boolean compress; - boolean overrideBuildRepositoryMetadata; +public class TranslogMetadataRemoteStoreIT extends RemoteStoreIT { @Override protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class, MockFsMetadataSupportedRepositoryPlugin.class); } - @Before - public void setup() { - clusterSettingsSuppliedByTest = true; - overrideBuildRepositoryMetadata = false; - repositoryLocation = randomRepoPath(); - compress = randomBoolean(); - } - - @Override - public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) { - if (overrideBuildRepositoryMetadata) { - Map nodeAttributes = node.getAttributes(); - String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)); - - String settingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, - name - ); - Map settingsMap = node.getAttributes() - .keySet() - .stream() - .filter(key -> key.startsWith(settingsAttributeKeyPrefix)) - .collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key))); - - Settings.Builder settings = Settings.builder(); - settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue())); - settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true); - - if (name.equals(REPOSITORY_NAME)) { - settings.put("location", repositoryLocation) - .put("compress", compress) - .put("max_remote_upload_bytes_per_sec", "1kb") - .put("chunk_size", 100, ByteSizeUnit.BYTES); - return new RepositoryMetadata(name, MockFsMetadataSupportedRepositoryPlugin.TYPE_MD, settings.build()); - } - return new RepositoryMetadata(name, type, settings.build()); - } else { - return super.buildRepositoryMetadata(node, name); - } - - } - @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/TranslogMetadataRemoteStoreRestoreIT.java similarity index 78% rename from server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataRestoreIT.java rename to server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/TranslogMetadataRemoteStoreRestoreIT.java index 29a7e1c73eea6..cb294e5bceee8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/RemoteStoreTranslogMetadataRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/TranslogMetadataRemoteStoreRestoreIT.java @@ -14,25 +14,12 @@ import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; -import org.junit.Before; -import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteStoreTranslogMetadataRestoreIT extends RemoteStoreRestoreIT { - Path repositoryLocation; - boolean compress; - boolean overrideBuildRepositoryMetadata; - - @Before - public void setup() { - clusterSettingsSuppliedByTest = true; - overrideBuildRepositoryMetadata = false; - repositoryLocation = randomRepoPath(); - compress = randomBoolean(); - } +public class TranslogMetadataRemoteStoreRestoreIT extends RemoteStoreRestoreIT { @Override protected Collection> nodePlugins() { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/TranslogMetadataRemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/TranslogMetadataRemoteStoreStatsIT.java new file mode 100644 index 0000000000000..c3af2853715ce --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/TranslogMetadataRemoteStoreStatsIT.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.translogmetadata; + +import org.opensearch.common.settings.Settings; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.RemoteStoreStatsIT; +import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; + +import java.util.Arrays; +import java.util.Collection; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class TranslogMetadataRemoteStoreStatsIT extends RemoteStoreStatsIT { + + @Override + public Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, MockFsMetadataSupportedRepositoryPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put( + remoteStoreClusterSettings( + REPOSITORY_NAME, + segmentRepoPath, + MockFsMetadataSupportedRepositoryPlugin.TYPE_MD, + REPOSITORY_2_NAME, + translogRepoPath, + MockFsMetadataSupportedRepositoryPlugin.TYPE_MD + ) + ) + .build(); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java index 9762b7d1336cb..347ff5b9c56dd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedBlobContainer.java @@ -59,13 +59,14 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp final Path file = path.resolve(writeContext.getFileName()); byte[] buffer = new byte[(int) writeContext.getFileSize()]; - // If the upload writeContext have a non-null metadata, we store the metadata content as translog file name. + // If the upload writeContext have a non-null metadata, we store the metadata content as translog.ckp file. if (writeContext.getMetadata() != null) { String base64String = writeContext.getMetadata().get(CHECKPOINT_FILE_DATA_KEY); byte[] decodedBytes = Base64.getDecoder().decode(base64String); ByteArrayInputStream inputStream = new ByteArrayInputStream(decodedBytes); int length = decodedBytes.length; - writeBlob(getCheckpointFileName(writeContext.getFileName()), inputStream, length, true); + String ckpFileName = getCheckpointFileName(writeContext.getFileName()); + writeBlob(ckpFileName, inputStream, length, true); } AtomicLong totalContentRead = new AtomicLong(); @@ -147,11 +148,10 @@ private String getCheckpointFileName(String translogFileName) { } public static String convertToBase64(InputStream inputStream) throws IOException { - try (inputStream) { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { byte[] buffer = new byte[128]; int bytesRead; int totalBytesRead = 0; - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); while ((bytesRead = inputStream.read(buffer)) != -1) { byteArrayOutputStream.write(buffer, 0, bytesRead); @@ -171,7 +171,8 @@ public static String convertToBase64(InputStream inputStream) throws IOException @Override public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException { InputStream inputStream = readBlob(blobName); - InputStream ckpInputStream = readBlob(getCheckpointFileName(blobName)); + String ckpFileName = getCheckpointFileName(blobName); + InputStream ckpInputStream = readBlob(ckpFileName); String ckpString = convertToBase64(ckpInputStream); Map metadata = new HashMap<>(); metadata.put(CHECKPOINT_FILE_DATA_KEY, ckpString); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index e11ac47c0f31c..da259f291ec07 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -111,11 +111,10 @@ public void uploadBlobs( // Builds a metadata map containing the Base64-encoded checkpoint file data associated with a translog file. static Map buildTransferFileMetadata(InputStream metadataInputStream) throws IOException { Map metadata = new HashMap<>(); - try (metadataInputStream) { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { byte[] buffer = new byte[128]; int bytesRead; int totalBytesRead = 0; - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); while ((bytesRead = metadataInputStream.read(buffer)) != -1) { byteArrayOutputStream.write(buffer, 0, bytesRead); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 95899f338313e..e1195bc9611fc 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -251,11 +251,11 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca String translogFilename = Translog.getFilename(Long.parseLong(generation)); if (isTranslogMetadataEnabled == false) { // Download Checkpoint file, translog file from remote to local FS - downloadToFS(ckpFileName, location, primaryTerm); - downloadToFS(translogFilename, location, primaryTerm); + downloadToFS(ckpFileName, location, primaryTerm, false); + downloadToFS(translogFilename, location, primaryTerm, false); } else { // Download translog.tlog file with object metadata from remote to local FS - Map metadata = downloadTranslogFileAndGetMetadata(translogFilename, location, primaryTerm); + Map metadata = downloadToFS(translogFilename, location, primaryTerm, true); try { assert metadata != null && !metadata.isEmpty() && metadata.containsKey(CHECKPOINT_FILE_DATA_KEY); recoverCkpFileUsingMetadata(metadata, location, generation, translogFilename); @@ -266,36 +266,6 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca return true; } - private Map downloadTranslogFileAndGetMetadata(String fileName, Path location, String primaryTerm) throws IOException { - Path filePath = location.resolve(fileName); - // Here, we always override the existing file if present. - // We need to change this logic when we introduce incremental download - deleteFileIfExists(filePath); - - boolean downloadStatus = false; - long bytesToRead = 0, downloadStartTime = System.nanoTime(); - Map metadata; - - try ( - FetchBlobResult fetchBlobResult = transferService.downloadBlobWithMetadata(remoteDataTransferPath.add(primaryTerm), fileName) - ) { - InputStream inputStream = fetchBlobResult.getInputStream(); - metadata = fetchBlobResult.getMetadata(); - - bytesToRead = inputStream.available(); - Files.copy(inputStream, filePath); - downloadStatus = true; - } finally { - remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); - if (downloadStatus) { - remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead); - } - } - // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync - fileTransferTracker.add(fileName, true); - return metadata; - } - /** * Process the provided metadata and tries to recover translog.ckp file to the FS. */ @@ -318,19 +288,37 @@ private void recoverCkpFileUsingMetadata(Map metadata, Path loca Files.write(filePath, ckpFileBytes); } - private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException { + private Map downloadToFS(String fileName, Path location, String primaryTerm, boolean withMetadata) throws IOException { Path filePath = location.resolve(fileName); // Here, we always override the existing file if present. // We need to change this logic when we introduce incremental download deleteFileIfExists(filePath); + Map metadata = null; boolean downloadStatus = false; long bytesToRead = 0, downloadStartTime = System.nanoTime(); - try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) { - // Capture number of bytes for stats before reading - bytesToRead = inputStream.available(); - Files.copy(inputStream, filePath); - downloadStatus = true; + try { + if (withMetadata) { + try ( + FetchBlobResult fetchBlobResult = transferService.downloadBlobWithMetadata( + remoteDataTransferPath.add(primaryTerm), + fileName + ) + ) { + InputStream inputStream = fetchBlobResult.getInputStream(); + metadata = fetchBlobResult.getMetadata(); + + bytesToRead = inputStream.available(); + Files.copy(inputStream, filePath); + downloadStatus = true; + } + } else { + try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) { + bytesToRead = inputStream.available(); + Files.copy(inputStream, filePath); + downloadStatus = true; + } + } } finally { remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L); if (downloadStatus) { @@ -340,6 +328,7 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync fileTransferTracker.add(fileName, true); + return metadata; } private void deleteFileIfExists(Path filePath) throws IOException { diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index a9f6fdc86155d..7630ca59ff0ff 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2640,6 +2640,7 @@ private static Settings buildRemoteStoreNodeAttributes( .put(segmentRepoSettingsAttributeKeyPrefix + "chunk_size", 200, ByteSizeUnit.BYTES); } settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(PathType.values())); + settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA.getKey(), randomBoolean()); return settings.build(); }