Skip to content

Commit

Permalink
fix-flaky: MultiCodecReindexIT
Browse files Browse the repository at this point in the history
The test spawns random number of data, client and manager nodes which
sometimes lead to connect_exception when any of the test nodes fail
to connect to the other node(s) leading to flakiness. Let's use only
one data node as per the requirement of the test to reduce flakiness.

Signed-off-by: Rohit Ashiwal <[email protected]>
  • Loading branch information
r1walz committed Oct 29, 2023
1 parent 73bbeb5 commit a179e4a
Showing 1 changed file with 87 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.opensearch.index.reindex.ReindexRequestBuilder;
import org.opensearch.index.reindex.ReindexTestCase;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -34,14 +35,10 @@
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_READ;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_WRITE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;

@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
public class MultiCodecReindexIT extends ReindexTestCase {

@Override
Expand All @@ -50,7 +47,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException {
internalCluster().ensureAtLeastNumDataNodes(1);
Map<String, String> codecMap = Map.of(
"best_compression",
"BEST_COMPRESSION",
Expand All @@ -61,72 +57,94 @@ public void testReindexingMultipleCodecs() throws InterruptedException, Executio
"lz4",
"BEST_SPEED"
);

for (Map.Entry<String, String> codec : codecMap.entrySet()) {
assertReindexingWithMultipleCodecs(codec.getKey(), codec.getValue(), codecMap);
}

}

private void assertReindexingWithMultipleCodecs(String destCodec, String destCodecMode, Map<String, String> codecMap)
throws ExecutionException, InterruptedException {

final String index = "test-index" + destCodec;
final String destIndex = "dest-index" + destCodec;

// creating source index
createIndex(
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", "default")
.put("index.merge.policy.max_merged_segment", "1b")
.build()
);
ensureGreen(index);

final int nbDocs = randomIntBetween(2, 5);
final String index = "source-index";

{ // Setup
// create source index
createIndex(
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", "default")
.put("index.merge.policy.max_merged_segment", "1b")
.build()
);
ensureGreen(index);

// index using all codecs
for (Map.Entry<String, String> codec : codecMap.entrySet()) {
useCodec(index, codec.getKey());

// perform index
indexRandom(
randomBoolean(),
false,
randomBoolean(),
IntStream.range(0, nbDocs)
.mapToObj(i -> client().prepareIndex(index).setId(UUID.randomUUID().toString()).setSource("num", i))
.collect(toList())
);

// perform flush
FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).execute().actionGet();
assertNoFailures(flushResponse);

// indexing with all 4 codecs
for (Map.Entry<String, String> codec : codecMap.entrySet()) {
useCodec(index, codec.getKey());
ingestDocs(index, nbDocs);
}

assertTrue(
getSegments(index).stream()
.flatMap(s -> s.getAttributes().values().stream())
.collect(Collectors.toSet())
.containsAll(codecMap.values())
);
// perform refresh
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(index).execute().actionGet();
assertNoFailures(refreshResponse);
}

// creating destination index with destination codec
createIndex(
destIndex,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", destCodec)
.build()
);
assertTrue(
getSegments(index).stream()
.flatMap(s -> s.getAttributes().values().stream())
.collect(Collectors.toSet())
.containsAll(codecMap.values())
);
}
{ // Test reindex for each codec
for (Map.Entry<String, String> codec : codecMap.entrySet()) {
final String destCodecMode = codec.getValue();
final String destCodec = codec.getKey();

final String destIndex = "dest-index" + destCodec;
final int expectedResponseSize = codecMap.size() * nbDocs;

// create destination index with selected codec
createIndex(
destIndex,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.codec", destCodec)
.build()
);
ensureGreen(destIndex);

// perform reindex
BulkByScrollResponse response = new ReindexRequestBuilder(client(), ReindexAction.INSTANCE).source(index)
.destination(destIndex)
.refresh(true)
.waitForActiveShards(ActiveShardCount.ONE)
.get();

// assertions
assertEquals(0, response.getNoops());
assertEquals(1, response.getBatches());
assertEquals(0, response.getDeleted());
assertEquals(0, response.getVersionConflicts());
assertEquals(0, response.getBulkFailures().size());
assertEquals(0, response.getSearchFailures().size());

assertEquals(expectedResponseSize, response.getTotal());
assertEquals(expectedResponseSize, response.getCreated());

assertTrue(response.getTook().getMillis() > 0);
assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode)));
}
}

BulkByScrollResponse bulkResponse = new ReindexRequestBuilder(client(), ReindexAction.INSTANCE).source(index)
.destination(destIndex)
.refresh(true)
.waitForActiveShards(ActiveShardCount.ONE)
.get();

assertEquals(codecMap.size() * nbDocs, bulkResponse.getCreated());
assertEquals(codecMap.size() * nbDocs, bulkResponse.getTotal());
assertEquals(0, bulkResponse.getDeleted());
assertEquals(0, bulkResponse.getNoops());
assertEquals(0, bulkResponse.getVersionConflicts());
assertEquals(1, bulkResponse.getBatches());
assertTrue(bulkResponse.getTook().getMillis() > 0);
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode)));
}

private void useCodec(String index, String codec) throws ExecutionException, InterruptedException {
Expand All @@ -142,46 +160,7 @@ private void useCodec(String index, String codec) throws ExecutionException, Int
assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1));
}

private void flushAndRefreshIndex(String index) {

// Request is not blocked
for (String blockSetting : Arrays.asList(
SETTING_BLOCKS_READ,
SETTING_BLOCKS_WRITE,
SETTING_READ_ONLY,
SETTING_BLOCKS_METADATA,
SETTING_READ_ONLY_ALLOW_DELETE
)) {
try {
enableIndexBlock(index, blockSetting);
// flush
FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).execute().actionGet();
assertNoFailures(flushResponse);

// refresh
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(index).execute().actionGet();
assertNoFailures(refreshResponse);
} finally {
disableIndexBlock(index, blockSetting);
}
}
}

private void ingestDocs(String index, int nbDocs) throws InterruptedException {

indexRandom(
randomBoolean(),
false,
randomBoolean(),
IntStream.range(0, nbDocs)
.mapToObj(i -> client().prepareIndex(index).setId(UUID.randomUUID().toString()).setSource("num", i))
.collect(toList())
);
flushAndRefreshIndex(index);
}

private ArrayList<Segment> getSegments(String index) {

return new ArrayList<>(
client().admin()
.indices()
Expand Down

0 comments on commit a179e4a

Please sign in to comment.