Skip to content

Commit

Permalink
enhance tests and cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed May 15, 2024
1 parent 3e65cc5 commit 9f089e7
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,91 +9,33 @@
package org.opensearch.remotestore.translogmetadata;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.RemoteStoreIT;
import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreTranslogMetadataIT extends RemoteStoreIT {

protected final String INDEX_NAME = "remote-store-test-idx-1";
Path repositoryLocation;
boolean compress;
boolean overrideBuildRepositoryMetadata;
public class TranslogMetadataRemoteStoreIT extends RemoteStoreIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockFsMetadataSupportedRepositoryPlugin.class);
}

@Before
public void setup() {
clusterSettingsSuppliedByTest = true;
overrideBuildRepositoryMetadata = false;
repositoryLocation = randomRepoPath();
compress = randomBoolean();
}

@Override
public RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
if (overrideBuildRepositoryMetadata) {
Map<String, String> nodeAttributes = node.getAttributes();
String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));

String settingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
name
);
Map<String, String> settingsMap = node.getAttributes()
.keySet()
.stream()
.filter(key -> key.startsWith(settingsAttributeKeyPrefix))
.collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key)));

Settings.Builder settings = Settings.builder();
settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue()));
settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true);

if (name.equals(REPOSITORY_NAME)) {
settings.put("location", repositoryLocation)
.put("compress", compress)
.put("max_remote_upload_bytes_per_sec", "1kb")
.put("chunk_size", 100, ByteSizeUnit.BYTES);
return new RepositoryMetadata(name, MockFsMetadataSupportedRepositoryPlugin.TYPE_MD, settings.build());
}
return new RepositoryMetadata(name, type, settings.build());
} else {
return super.buildRepositoryMetadata(node, name);
}

}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,12 @@
import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.junit.Before;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreTranslogMetadataRestoreIT extends RemoteStoreRestoreIT {
Path repositoryLocation;
boolean compress;
boolean overrideBuildRepositoryMetadata;

@Before
public void setup() {
clusterSettingsSuppliedByTest = true;
overrideBuildRepositoryMetadata = false;
repositoryLocation = randomRepoPath();
compress = randomBoolean();
}
public class TranslogMetadataRemoteStoreRestoreIT extends RemoteStoreRestoreIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.translogmetadata;

import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.RemoteStoreStatsIT;
import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Arrays;
import java.util.Collection;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class TranslogMetadataRemoteStoreStatsIT extends RemoteStoreStatsIT {

@Override
public Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockFsMetadataSupportedRepositoryPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(
remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
MockFsMetadataSupportedRepositoryPlugin.TYPE_MD,
REPOSITORY_2_NAME,
translogRepoPath,
MockFsMetadataSupportedRepositoryPlugin.TYPE_MD
)
)
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
final Path file = path.resolve(writeContext.getFileName());
byte[] buffer = new byte[(int) writeContext.getFileSize()];

// If the upload writeContext have a non-null metadata, we store the metadata content as translog file name.
// If the upload writeContext have a non-null metadata, we store the metadata content as translog.ckp file.
if (writeContext.getMetadata() != null) {
String base64String = writeContext.getMetadata().get(CHECKPOINT_FILE_DATA_KEY);
byte[] decodedBytes = Base64.getDecoder().decode(base64String);
ByteArrayInputStream inputStream = new ByteArrayInputStream(decodedBytes);
int length = decodedBytes.length;
writeBlob(getCheckpointFileName(writeContext.getFileName()), inputStream, length, true);
String ckpFileName = getCheckpointFileName(writeContext.getFileName());
writeBlob(ckpFileName, inputStream, length, true);
}

AtomicLong totalContentRead = new AtomicLong();
Expand Down Expand Up @@ -147,11 +148,10 @@ private String getCheckpointFileName(String translogFileName) {
}

public static String convertToBase64(InputStream inputStream) throws IOException {
try (inputStream) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
byte[] buffer = new byte[128];
int bytesRead;
int totalBytesRead = 0;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

while ((bytesRead = inputStream.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, bytesRead);
Expand All @@ -171,7 +171,8 @@ public static String convertToBase64(InputStream inputStream) throws IOException
@Override
public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
InputStream inputStream = readBlob(blobName);
InputStream ckpInputStream = readBlob(getCheckpointFileName(blobName));
String ckpFileName = getCheckpointFileName(blobName);
InputStream ckpInputStream = readBlob(ckpFileName);
String ckpString = convertToBase64(ckpInputStream);
Map<String, String> metadata = new HashMap<>();
metadata.put(CHECKPOINT_FILE_DATA_KEY, ckpString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,10 @@ public void uploadBlobs(
// Builds a metadata map containing the Base64-encoded checkpoint file data associated with a translog file.
static Map<String, String> buildTransferFileMetadata(InputStream metadataInputStream) throws IOException {
Map<String, String> metadata = new HashMap<>();
try (metadataInputStream) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
byte[] buffer = new byte[128];
int bytesRead;
int totalBytesRead = 0;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

while ((bytesRead = metadataInputStream.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, bytesRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca
String translogFilename = Translog.getFilename(Long.parseLong(generation));
if (isTranslogMetadataEnabled == false) {
// Download Checkpoint file, translog file from remote to local FS
downloadToFS(ckpFileName, location, primaryTerm);
downloadToFS(translogFilename, location, primaryTerm);
downloadToFS(ckpFileName, location, primaryTerm, false);
downloadToFS(translogFilename, location, primaryTerm, false);
} else {
// Download translog.tlog file with object metadata from remote to local FS
Map<String, String> metadata = downloadTranslogFileAndGetMetadata(translogFilename, location, primaryTerm);
Map<String, String> metadata = downloadToFS(translogFilename, location, primaryTerm, true);
try {
assert metadata != null && !metadata.isEmpty() && metadata.containsKey(CHECKPOINT_FILE_DATA_KEY);
recoverCkpFileUsingMetadata(metadata, location, generation, translogFilename);
Expand All @@ -266,36 +266,6 @@ public boolean downloadTranslog(String primaryTerm, String generation, Path loca
return true;
}

private Map<String, String> downloadTranslogFileAndGetMetadata(String fileName, Path location, String primaryTerm) throws IOException {
Path filePath = location.resolve(fileName);
// Here, we always override the existing file if present.
// We need to change this logic when we introduce incremental download
deleteFileIfExists(filePath);

boolean downloadStatus = false;
long bytesToRead = 0, downloadStartTime = System.nanoTime();
Map<String, String> metadata;

try (
FetchBlobResult fetchBlobResult = transferService.downloadBlobWithMetadata(remoteDataTransferPath.add(primaryTerm), fileName)
) {
InputStream inputStream = fetchBlobResult.getInputStream();
metadata = fetchBlobResult.getMetadata();

bytesToRead = inputStream.available();
Files.copy(inputStream, filePath);
downloadStatus = true;
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
if (downloadStatus) {
remoteTranslogTransferTracker.addDownloadBytesSucceeded(bytesToRead);
}
}
// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
fileTransferTracker.add(fileName, true);
return metadata;
}

/**
* Process the provided metadata and tries to recover translog.ckp file to the FS.
*/
Expand All @@ -318,19 +288,37 @@ private void recoverCkpFileUsingMetadata(Map<String, String> metadata, Path loca
Files.write(filePath, ckpFileBytes);
}

private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException {
private Map<String, String> downloadToFS(String fileName, Path location, String primaryTerm, boolean withMetadata) throws IOException {
Path filePath = location.resolve(fileName);
// Here, we always override the existing file if present.
// We need to change this logic when we introduce incremental download
deleteFileIfExists(filePath);

Map<String, String> metadata = null;
boolean downloadStatus = false;
long bytesToRead = 0, downloadStartTime = System.nanoTime();
try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) {
// Capture number of bytes for stats before reading
bytesToRead = inputStream.available();
Files.copy(inputStream, filePath);
downloadStatus = true;
try {
if (withMetadata) {
try (
FetchBlobResult fetchBlobResult = transferService.downloadBlobWithMetadata(
remoteDataTransferPath.add(primaryTerm),
fileName
)
) {
InputStream inputStream = fetchBlobResult.getInputStream();
metadata = fetchBlobResult.getMetadata();

bytesToRead = inputStream.available();
Files.copy(inputStream, filePath);
downloadStatus = true;
}
} else {
try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) {
bytesToRead = inputStream.available();
Files.copy(inputStream, filePath);
downloadStatus = true;
}
}
} finally {
remoteTranslogTransferTracker.addDownloadTimeInMillis((System.nanoTime() - downloadStartTime) / 1_000_000L);
if (downloadStatus) {
Expand All @@ -340,6 +328,7 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th

// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
fileTransferTracker.add(fileName, true);
return metadata;
}

private void deleteFileIfExists(Path filePath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2640,6 +2640,7 @@ private static Settings buildRemoteStoreNodeAttributes(
.put(segmentRepoSettingsAttributeKeyPrefix + "chunk_size", 200, ByteSizeUnit.BYTES);
}
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(PathType.values()));
settings.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_TRANSLOG_METADATA.getKey(), randomBoolean());
return settings.build();
}

Expand Down

0 comments on commit 9f089e7

Please sign in to comment.