Skip to content

Commit

Permalink
[Remote Store] Upload translog checkpoint as object metadata to trans…
Browse files Browse the repository at this point in the history
…log (#13637)

* Upload translog checkpoint as object metadata to translog
Signed-off-by: Sandeep Kumawat <[email protected]>
  • Loading branch information
skumawat2025 committed May 23, 2024
1 parent 03c13cb commit de4e20c
Show file tree
Hide file tree
Showing 56 changed files with 1,287 additions and 385 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423))
- Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478))
- Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293))
- [Remote Store] Upload translog checkpoint as object metadata to translog.tlog([#13637](https://github.com/opensearch-project/OpenSearch/pull/13637))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.InputStreamWithMetadata;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -143,9 +143,9 @@ public boolean blobExists(String blobName) {

@ExperimentalApi
@Override
public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
public InputStreamWithMetadata readBlobWithMetadata(String blobName) throws IOException {
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
return new FetchBlobResult(s3RetryingInputStream, s3RetryingInputStream.getMetadata());
return new InputStreamWithMetadata(s3RetryingInputStream, s3RetryingInputStream.getMetadata());

Check warning on line 148 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L148

Added line #L148 was not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ public Map<Metric, Map<String, Long>> extendedStats() {
return extendedStats;
}

@Override
public boolean isBlobMetadataEnabled() {
return true;

Check warning on line 249 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L249

Added line #L249 was not covered by tests
}

public ObjectCannedACL getCannedACL() {
return cannedACL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.VersionUtils;

import java.util.concurrent.ExecutionException;
Expand All @@ -60,6 +61,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteCloneIndexIT extends RemoteStoreBaseIntegTestCase {

@Override
Expand Down Expand Up @@ -139,6 +141,7 @@ public void testCreateCloneIndex() {
}

public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException {
asyncUploadMockFsRepo = false;
Version version = VersionUtils.randomIndexCompatibleVersion(random());
int numPrimaryShards = 1;
prepareCreate("source").setSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.VersionUtils;
import org.junit.Before;

import java.util.Arrays;
import java.util.Map;
Expand All @@ -61,12 +63,18 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteShrinkIndexIT extends RemoteStoreBaseIntegTestCase {
@Override
protected boolean forbidPrivateIndexSettings() {
return false;
}

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
Expand All @@ -84,6 +92,7 @@ public void testCreateShrinkIndexToN() {
int[] shardSplits = randomFrom(possibleShardSplits);
assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]);
assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]);

internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", shardSplits[0])).get();
for (int i = 0; i < 20; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.VersionUtils;

import java.io.IOException;
Expand All @@ -86,6 +87,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteSplitIndexIT extends RemoteStoreBaseIntegTestCase {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
Expand All @@ -32,6 +33,11 @@ public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {

private static String INDEX_NAME = "test-index";

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,5 +512,6 @@ private void assertCustomIndexMetadata(String index) {
logger.info("---> Asserting custom index metadata");
IndexMetadata iMd = internalCluster().client().admin().cluster().prepareState().get().getState().metadata().index(index);
assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY));
assertNotNull(iMd.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY).get(IndexMetadata.TRANSLOG_METADATA_KEY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class BaseRemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";
static final String INDEX_NAMES = "test-remote-store-1,test-remote-store-2,remote-store-test-index-1,remote-store-test-index-2";
Expand All @@ -39,7 +42,7 @@ public Settings indexSettings(int shards, int replicas) {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
}

protected void restore(String... indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.junit.Before;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -50,7 +49,7 @@ public class PrimaryTermValidationIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.remotestore.translogmetadata.mocks.MockFsMetadataSupportedRepositoryPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
Expand All @@ -48,13 +51,15 @@
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
Expand All @@ -74,6 +79,8 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected Path segmentRepoPath;
protected Path translogRepoPath;
protected boolean clusterSettingsSuppliedByTest = false;
protected boolean asyncUploadMockFsRepo = randomBoolean();
private boolean metadataSupportedType = randomBoolean();
private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
randomAlphaOfLength(5),
Expand Down Expand Up @@ -129,6 +136,19 @@ protected Map<String, Long> indexData(int numberOfIterations, boolean invokeFlus
return indexingStats;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
if (!clusterSettingsSuppliedByTest && asyncUploadMockFsRepo) {
if (metadataSupportedType) {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsMetadataSupportedRepositoryPlugin.class))
.collect(Collectors.toList());
} else {
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockFsRepositoryPlugin.class)).collect(Collectors.toList());
}
}
return super.nodePlugins();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
Expand All @@ -138,10 +158,27 @@ protected Settings nodeSettings(int nodeOrdinal) {
if (clusterSettingsSuppliedByTest) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
} else {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.build();
if (asyncUploadMockFsRepo) {
String repoType = metadataSupportedType ? MockFsMetadataSupportedRepositoryPlugin.TYPE_MD : MockFsRepositoryPlugin.TYPE;
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(
remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
repoType,
REPOSITORY_2_NAME,
translogRepoPath,
repoType
)
)
.build();
} else {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.build();
}
}
}

Expand Down Expand Up @@ -221,6 +258,8 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFiel
@After
public void teardown() {
clusterSettingsSuppliedByTest = false;
asyncUploadMockFsRepo = randomBoolean();
metadataSupportedType = randomBoolean();
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_NAME);
assertRemoteStoreRepositoryOnAllNodes(REPOSITORY_2_NAME);
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -47,6 +48,11 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreClusterStateRestoreIT extends BaseRemoteStoreRestoreIT {

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand All @@ -37,7 +38,7 @@ public class RemoteStoreForceMergeIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand All @@ -48,14 +47,14 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
Expand All @@ -81,7 +80,7 @@ public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockFsRepositoryPlugin.class);
return Stream.concat(super.nodePlugins().stream(), Stream.of(MockTransportService.TestPlugin.class)).collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -797,25 +796,8 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep
// Test local only translog files which are not uploaded to remote store (no metadata present in remote)
// Without the cleanup change in RemoteFsTranslog.createEmptyTranslog, this test fails with NPE.
public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
clusterSettingsSuppliedByTest = true;

// Overriding settings to use AsyncMultiStreamBlobContainer
Settings settings = Settings.builder()
.put(super.nodeSettings(1))
.put(
remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
MockFsRepositoryPlugin.TYPE,
REPOSITORY_2_NAME,
translogRepoPath,
MockFsRepositoryPlugin.TYPE
)
)
.build();

internalCluster().startClusterManagerOnlyNode(settings);
String dataNode = internalCluster().startDataOnlyNode(settings);
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();

// 1. Create index with 0 replica
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
Expand Down
Loading

0 comments on commit de4e20c

Please sign in to comment.