Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Fix couple of Remote Store flaky test and use bulk api for ingestion #9190

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
package org.opensearch.remotestore;

import org.junit.After;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
Expand All @@ -31,10 +35,10 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -74,13 +78,18 @@ protected Map<String, Long> indexData(int numberOfIterations, boolean invokeFlus
indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED + "-shard-" + shardId, maxSeqNoRefreshedOrFlushed);
refreshedOrFlushedOperations = totalOperations;
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = indexSingleDoc(index);
maxSeqNo = response.getSeqNo();
shardId = response.getShardId().id();
indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo);
int numberOfBulk = randomIntBetween(1, 5);
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
for (int j = 0; j < numberOfBulk; j++) {
BulkResponse res = indexBulk(index, numberOfOperations);
for (BulkItemResponse singleResp : res.getItems()) {
indexingStats.put(
MAX_SEQ_NO_TOTAL + "-shard-" + singleResp.getResponse().getShardId().id(),
singleResp.getResponse().getSeqNo()
);
maxSeqNo = singleResp.getResponse().getSeqNo();
}
totalOperations += numberOfOperations;
}
totalOperations += numberOfOperations;
}

indexingStats.put(TOTAL_OPERATIONS, totalOperations);
Expand Down Expand Up @@ -123,6 +132,18 @@ protected IndexResponse indexSingleDoc(String indexName) {
.get();
}

protected BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
final IndexRequest request = client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
.request();
bulkRequest.add(request);
}
return client().bulk(bulkRequest).actionGet();
}

public static Settings remoteStoreClusterSettings(String segmentRepoName) {
return remoteStoreClusterSettings(segmentRepoName, segmentRepoName);
}
Expand Down Expand Up @@ -170,10 +191,11 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return remoteStoreIndexSettings(numberOfReplicas, 1);
}

protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) {
protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit, int refresh) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), String.valueOf(refresh))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -29,12 +28,10 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -148,10 +145,9 @@ public void testRemoteTranslogCleanup() throws Exception {
verifyRemoteStoreCleanup();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658")
public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
internalCluster().startDataOnlyNodes(3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
internalCluster().startDataOnlyNodes(1);
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
Expand All @@ -163,20 +159,22 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
// Delete is async.
assertBusy(() -> {
int actualFileCount = getFileCount(indexPath);
if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations, numberOfIterations + 1)));
if (numberOfIterations <= LAST_N_METADATA_FILES_TO_KEEP) {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
} else {
// As delete is async its possible that the file gets created before the deletion or after
// deletion.
MatcherAssert.assertThat(actualFileCount, is(oneOf(10, 11)));
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(LAST_N_METADATA_FILES_TO_KEEP - 1, LAST_N_METADATA_FILES_TO_KEEP, LAST_N_METADATA_FILES_TO_KEEP + 1))
);
}
}, 30, TimeUnit.SECONDS);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658")
public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
internalCluster().startDataOnlyNodes(3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false, INDEX_NAME);
String indexUUID = client().admin()
Expand All @@ -187,6 +185,6 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata");
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations, numberOfIterations + 1)));
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public void testPromoteReplicaToPrimary() throws Exception {
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9130")
public void testFailoverWhileIndexing() throws Exception {
internalCluster().startNode();
internalCluster().startNode();
Expand All @@ -143,7 +142,7 @@ public void testFailoverWhileIndexing() throws Exception {
.setSource("field", numAutoGenDocs.get())
.get();

if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.ACCEPTED) {
if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.OK) {
numAutoGenDocs.incrementAndGet();
if (numAutoGenDocs.get() == docCount / 2) {
if (random().nextInt(3) == 0) {
Expand Down