Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Remote Store] Fix couple of Remote Store flaky test and use bulk api… #9251

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
package org.opensearch.remotestore;

import org.junit.After;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
Expand All @@ -31,10 +35,10 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -74,13 +78,18 @@ protected Map<String, Long> indexData(int numberOfIterations, boolean invokeFlus
indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED + "-shard-" + shardId, maxSeqNoRefreshedOrFlushed);
refreshedOrFlushedOperations = totalOperations;
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = indexSingleDoc(index);
maxSeqNo = response.getSeqNo();
shardId = response.getShardId().id();
indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo);
int numberOfBulk = randomIntBetween(1, 5);
for (int j = 0; j < numberOfBulk; j++) {
BulkResponse res = indexBulk(index, numberOfOperations);
for (BulkItemResponse singleResp : res.getItems()) {
indexingStats.put(
MAX_SEQ_NO_TOTAL + "-shard-" + singleResp.getResponse().getShardId().id(),
singleResp.getResponse().getSeqNo()
);
maxSeqNo = singleResp.getResponse().getSeqNo();
}
totalOperations += numberOfOperations;
}
totalOperations += numberOfOperations;
}

indexingStats.put(TOTAL_OPERATIONS, totalOperations);
Expand Down Expand Up @@ -123,6 +132,18 @@ protected IndexResponse indexSingleDoc(String indexName) {
.get();
}

protected BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
final IndexRequest request = client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
.request();
bulkRequest.add(request);
}
return client().bulk(bulkRequest).actionGet();
}

public static Settings remoteStoreClusterSettings(String segmentRepoName) {
return remoteStoreClusterSettings(segmentRepoName, segmentRepoName);
}
Expand Down Expand Up @@ -170,10 +191,11 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return remoteStoreIndexSettings(numberOfReplicas, 1);
}

protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) {
protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit, int refresh) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), String.valueOf(refresh))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

package org.opensearch.remotestore;

import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -29,7 +29,9 @@
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -143,10 +145,9 @@ public void testRemoteTranslogCleanup() throws Exception {
verifyRemoteStoreCleanup();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658")
public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
internalCluster().startDataOnlyNodes(3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
Expand All @@ -158,20 +159,22 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
// Delete is async.
assertBusy(() -> {
int actualFileCount = getFileCount(indexPath);
if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) {
assertEquals(numberOfIterations, actualFileCount);
if (numberOfIterations <= LAST_N_METADATA_FILES_TO_KEEP) {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
} else {
// As delete is async its possible that the file gets created before the deletion or after
// deletion.
assertTrue(actualFileCount >= 10 || actualFileCount <= 11);
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(LAST_N_METADATA_FILES_TO_KEEP - 1, LAST_N_METADATA_FILES_TO_KEEP, LAST_N_METADATA_FILES_TO_KEEP + 1))
);
}
}, 30, TimeUnit.SECONDS);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658")
public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
internalCluster().startDataOnlyNodes(3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false, INDEX_NAME);
String indexUUID = client().admin()
Expand All @@ -180,6 +183,8 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata");
assertEquals(numberOfIterations, getFileCount(indexPath));
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public void testPromoteReplicaToPrimary() throws Exception {
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9130")
public void testFailoverWhileIndexing() throws Exception {
internalCluster().startNode();
internalCluster().startNode();
Expand All @@ -143,7 +142,7 @@ public void testFailoverWhileIndexing() throws Exception {
.setSource("field", numAutoGenDocs.get())
.get();

if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.ACCEPTED) {
if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.OK) {
numAutoGenDocs.incrementAndGet();
if (numAutoGenDocs.get() == docCount / 2) {
if (random().nextInt(3) == 0) {
Expand Down