Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication IT for #12047 #12792

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.SetOnce;
Expand Down Expand Up @@ -823,7 +825,14 @@ private List<RecoveryState> findRecoveriesForTargetNode(String nodeName, List<Re

private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount, int shardCount, int replicaCount)
throws ExecutionException, InterruptedException {
final int numDocs = numDocs();
return createAndPopulateIndex(name, nodeCount, shardCount, replicaCount, numDocs);
}

private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount, int shardCount, int replicaCount, int numDocs)
throws ExecutionException, InterruptedException {

assert numDocs >= 0;
logger.info("--> creating test index: {}", name);
assertAcked(
prepareCreate(
Expand All @@ -838,7 +847,6 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount,
ensureGreen();

logger.info("--> indexing sample data");
final int numDocs = numDocs();
final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];

for (int i = 0; i < numDocs; i++) {
Expand All @@ -864,6 +872,155 @@ protected int numDocs() {
return between(MIN_DOC_COUNT, MAX_DOC_COUNT);
}

/**
* **Temporary:**
* Let's try to recreate issue described in GitHub ticket <a href="https://github.com/opensearch-project/OpenSearch/issues/12047">...</a>.
*
* How we do it:
* 1) Start a single node cluster
* 2) Create several (four) indices with some data, having no replicas and single primary shard
* 3) We check that both counters(*) match (only active shards and no ongoing recoveries)
* 4) Slowdown recovery speed (throttle recovery)
* 5) Disable shard rebalancing
* 6) Start second node
* 7) Verify cluster rebalancing does not kick in
* 8) Force all shards to start relocating from node A to node B
* 9) While shards are relocating we compare both counters(*) again, we check restore stage type is INDEX
* 10) We lift the throttling limit and wait for rebalancing to finish, [END]
*
* (*) both counters = counters obtained via two different calls:
* a) client().admin().cluster().health(...)
* b) client().admin().indices().recoveries(...)
* -------------------------------------------------------------------
* The recreation requires two types of calls:
* 1) An admin cluster health request – a transport level request
* 2) A "_cat" API call to get recoveries – a REST level request
*
* It is a challenge to combine these two types of calls in a single IT test because the REST call
* requires proper configuration of a transport layer. Although there are some examples of tests
* combining these two request types in DanglingIndicesRestIT class it turns out that controlling
* cluster lifecycle remains a challenge in such environment.
* Instead, we are going to "rip-out" transport level requests from the RestCatRecoveryAction class
* source. BTW, test {@link #testRerouteRecovery()} seems to be doing similar thing.
*/
public void testRecoveryCountConsistency() throws Exception {

// Define more indices name constants as there is only a single constant defined for tests
// in this class, and we will need more indices.
final String INDEX_NAME_1 = "text-idx-1";
final String INDEX_NAME_2 = "text-idx-2";
final String INDEX_NAME_3 = "text-idx-3";
final String INDEX_NAME_4 = "text-idx-4";
final String[] INDICES = new String[]{
INDEX_NAME_1, INDEX_NAME_2, INDEX_NAME_3, INDEX_NAME_4
};
int numDocs = 1000;

logger.info("--> start node A");
final String nodeA = internalCluster().startNode();

ByteSizeValue shardSize = null;
for (String index: INDICES) {
logger.info("--> create index on node: {}", nodeA);
shardSize = createAndPopulateIndex(index, 1, SHARD_COUNT, REPLICA_COUNT, numDocs).getShards()[0].getStats()
.getStore()
.size();
}

ensureGreen();
refreshAndWaitForReplication();

ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().execute().actionGet();
assertEquals(clusterHealth.getInitializingShards(), 0);
assertEquals(clusterHealth.getRelocatingShards(), 0);
assertEquals(clusterHealth.getActivePrimaryShards(), INDICES.length);

// Not possible here:
// Response catResponse = getRestClient().performRequest(new Request("GET", "/_cat/recovery?active_only"));
// assertThat(catResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

RecoveryRequest recoveryRequest = new RecoveryRequest();
// We are going to throttle the recovery process, hence we do not want the request to timeout
recoveryRequest.timeout(TimeValue.MAX_VALUE);
recoveryRequest.detailed(false);
recoveryRequest.activeOnly(true);
//recoveryRequest.indicesOptions(); // not used for now

RecoveryResponse recoveryResponse = client().admin().indices().recoveries(recoveryRequest).actionGet();
assertTrue(recoveryResponse.hasRecoveries());
assertEquals(recoveryResponse.shardRecoveryStates().size(), INDICES.length);
recoveryResponse.shardRecoveryStates().forEach((s, recoveryStates) -> {

assertTrue(Arrays.asList(INDICES).contains(s));
// Initially, the (only) index shard has been recovered so there is no active one going on.
assertEquals(recoveryStates.size(), 0);
});

logger.info("--> slowing down recoveries");
slowDownRecovery(shardSize);

logger.info("--> disable cluster rebalancing");
ClusterUpdateSettingsResponse settingsUpdateResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)
)
.get();
assertAcked(settingsUpdateResponse);

logger.info("--> start node B");
final String nodeB = internalCluster().startNode();

// No shards want to rebalance, all stay at node A.
ensureGreen();

for (String index: INDICES) {
logger.info("--> move shard from: {} to: {}", nodeA, nodeB);
ClusterState clusterState = client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(index, 0, nodeA, nodeB))
.execute()
.actionGet()
.getState();
clusterState.coordinationMetadata();
}

logger.info("--> waiting for recovery to start both on source and target");
for (String index_name: INDICES) {
final Index index = resolveIndex(index_name);
assertBusyWithFixedSleepTime(() -> {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1));
indicesService = internalCluster().getInstance(IndicesService.class, nodeB);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1));
}, TimeValue.timeValueSeconds(10), TimeValue.timeValueMillis(500));
}

// We expect all shards are relocating now (all indices have a single shard that
// is now relocating from node A to node B)
clusterHealth = client().admin().cluster().prepareHealth().execute().actionGet();
assertEquals(clusterHealth.getRelocatingShards(), INDICES.length);

recoveryResponse = client().admin().indices().recoveries(recoveryRequest).actionGet();
assertTrue(recoveryResponse.hasRecoveries());
assertEquals(recoveryResponse.shardRecoveryStates().size(), INDICES.length);
recoveryResponse.shardRecoveryStates().forEach((s, recoveryStates) -> {

assertTrue(Arrays.asList(INDICES).contains(s));
assertEquals(recoveryStates.size(), 1);
recoveryStates.forEach(recoveryState -> {
assertEquals(recoveryState.getStage(), Stage.INDEX);
});
});

restoreRecoverySpeed();
refreshAndWaitForReplication();
ensureGreen();
}

public void testTransientErrorsDuringRecoveryAreRetried() throws Exception {
final String indexName = "test";
final Settings nodeSettings = Settings.builder()
Expand Down
Loading