Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SnapshotV2] Support centralize snapshot creation #15124

Merged
merged 35 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9142a25
Initial Commit to support centralize snapshot creation and implicit l…
Aug 6, 2024
d9bbc65
Fix deserilization error
Aug 6, 2024
f72702f
Fix gradle spotless check
Aug 6, 2024
8e85231
Fix listener
Aug 6, 2024
8a507ad
Merge branch 'main' into snapshot-pinned-timestamp
Aug 7, 2024
2d404e8
Fix test
Aug 7, 2024
90c860c
Fix snapshot generation
Aug 8, 2024
193da65
Modify cluster setting name
Aug 14, 2024
fe2aaaf
Add more tests
Aug 14, 2024
14c08ae
Merge branch 'main' into snapshot-pinned-timestamp
Aug 15, 2024
ec17028
Merge branch 'main' into snapshot-pinned-timestamp
Aug 20, 2024
6504169
Uncomment pin timestamp code
Aug 20, 2024
626c2fa
Modify log messages
Aug 21, 2024
be65f6d
Add spotless check failure fix
Aug 21, 2024
62452ee
Fix completion listener for snapshot v2
Aug 21, 2024
00031ec
Elevate cluster state update priority for repository metadata update …
Aug 21, 2024
0c636ef
Add more integ tests
Aug 22, 2024
623f994
Add priority as IMMEDIATE for cluster state repo update task only for…
Aug 23, 2024
2e4795b
Fix build error
Aug 23, 2024
a6090a7
Fix spotless error
Aug 23, 2024
b5d012f
Add repository setting for snapshot v2
Aug 23, 2024
1394d8c
Merge branch 'main' into snapshot-pinned-timestamp
Aug 23, 2024
80bf6cc
Address review comments
Aug 26, 2024
b0cbc08
Add integ test to verify snapshot creation if shallow copy repo setti…
Aug 26, 2024
38af0f6
Fix spotless vilation error
Aug 26, 2024
73376a8
Address review comment
Aug 26, 2024
39b57e3
Address review comments
Aug 26, 2024
e1eecbd
Add min version check for backward compatibility
Aug 26, 2024
983d2b5
address review comments
Aug 27, 2024
a423475
add integ test for master failover scenario
Aug 27, 2024
207b03d
Add more integ tests
Aug 27, 2024
872e136
refactor code
Aug 27, 2024
1ef061a
add changelog
Aug 27, 2024
0289d67
Merge branch 'main' into snapshot-pinned-timestamp
anshu1106 Aug 28, 2024
fb48a2d
Add pinned timestamp setting in integ tests
Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.BlobStoreException;
Expand Down Expand Up @@ -391,6 +392,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
Expand All @@ -400,6 +402,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.delete.DeleteResponse;
Expand All @@ -23,6 +24,7 @@
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexService;
Expand All @@ -32,6 +34,11 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryData;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotRestoreException;
Expand All @@ -46,6 +53,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -746,4 +754,283 @@ public void testInvalidRestoreRequestScenarios() throws Exception {
assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.segment.repository]" + " on restore"));
}

public void testCreateSnapshotV2() throws Exception {

internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

assertAcked(
client().admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
.put(BlobStoreRepository.SNAPSHOT_V2.getKey(), true)
)
);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList());
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));

indexDocuments(client, indexName1, 10);
indexDocuments(client, indexName2, 20);

createIndex(indexName3, indexSettings);
indexDocuments(client, indexName3, 10);

String snapshotName2 = "test-create-snapshot2";

// verify even if waitForCompletion is not true, the request executes in a sync manner
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName2)
.get();
snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));

}

public void testConcurrentSnapshotV2CreateOperation() throws InterruptedException, ExecutionException {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String snapshotRepoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

assertAcked(
client().admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SNAPSHOT_V2.getKey(), true)
)
);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

int concurrentSnapshots = 5;

// Prepare threads for concurrent snapshot creation
List<Thread> threads = new ArrayList<>();

for (int i = 0; i < concurrentSnapshots; i++) {
int snapshotIndex = i;
Thread thread = new Thread(() -> {
try {
String snapshotName = "snapshot-concurrent-" + snapshotIndex;
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));
} catch (Exception e) {}
});
threads.add(thread);
}
// start all threads
for (Thread thread : threads) {
thread.start();
}

// Wait for all threads to complete
for (Thread thread : threads) {
thread.join();
}

// Validate that only one snapshot has been created
Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName);
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFuture);

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();
assertThat(repositoryData.getSnapshotIds().size(), equalTo(1));
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
}

public void testCreateSnapshotV2WithRedIndex() throws Exception {
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

assertAcked(
client().admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SNAPSHOT_V2.getKey(), true)
)
);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

internalCluster().ensureAtLeastNumDataNodes(0);
ensureRed(indexName1);
ensureRed(indexName2);
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));
}

public void testCreateSnapshotV2WithIndexingLoad() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String snapshotRepoName = "test-create-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

assertAcked(
client().admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SNAPSHOT_V2.getKey(), true)
)
);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

Thread indexingThread = new Thread(() -> {
try {
for (int i = 0; i < 50; i++) {
internalCluster().client().prepareIndex("test-index-load").setSource("field", "value" + i).execute().actionGet();
}
} catch (Exception e) {
fail("indexing failed due to exception: " + e.getMessage());
}
});

// Start indexing
indexingThread.start();

// Wait for a bit to let some documents be indexed
Thread.sleep(1000);

// Create a snapshot while indexing is ongoing
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.get();

SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));
assertTrue(snapshotInfo.indices().contains("test-index-load"));
assertTrue(snapshotInfo.indices().contains(indexName1));
assertTrue(snapshotInfo.indices().contains(indexName2));
indexingThread.join();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -127,6 +128,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
Expand All @@ -136,6 +138,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down
Loading
Loading