Skip to content

Commit

Permalink
[Segment Replication] Fix shrink Index bug with segment replication. (o…
Browse files Browse the repository at this point in the history
…pensearch-project#9711)

* Fix shrink Index bug with segment replication.

Signed-off-by: Rishikesh1159 <[email protected]>

* Apply spotless and refactor.

Signed-off-by: Rishikesh1159 <[email protected]>

* fix comple errors.

Signed-off-by: Rishikesh1159 <[email protected]>

* Address comments on PR.

Signed-off-by: Rishikesh1159 <[email protected]>

* Adding bytes behind check and removing doc count check.

Signed-off-by: Rishikesh1159 <[email protected]>

---------

Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 authored Sep 5, 2023
1 parent a737446 commit 9602d1d
Show file tree
Hide file tree
Showing 2 changed files with 322 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.shrink.ResizeType;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

/**
* This test class verifies Resize Reequests (Shrink, Split, Clone) with segment replication as replication strategy.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2)
public class SegmentReplicationResizeRequestIT extends SegmentReplicationBaseIT {

public void testCreateShrinkIndexThrowsExceptionWhenReplicasBehind() throws Exception {

// create index with -1 as refresh interval as we are blocking segrep and we want to control refreshes.
prepareCreate("test").setSettings(
Settings.builder()
.put(indexSettings())
.put("index.refresh_interval", -1)
.put("index.number_of_replicas", 1)
.put("number_of_shards", 2)
).get();

final Map<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(new DiscoveryNode[0]);
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below.
ensureGreen();

// block Segment Replication so that replicas never get the docs from primary
CountDownLatch latch = new CountDownLatch(1);
try (final Releasable ignored = blockReplication(List.of(discoveryNodes[0].getName()), latch)) {
final int docs = 500;
for (int i = 0; i < docs; i++) {
client().prepareIndex("test").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
}

// block writes on index before performing shrink operation
client().admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(
Settings.builder()
.put("index.routing.allocation.require._name", discoveryNodes[0].getName())
.put("index.blocks.write", true)
)
.get();
ensureGreen();

// Trigger Shrink operation, as replicas don't have any docs it will throw exception that replicas haven't caught up
IllegalStateException exception = assertThrows(
IllegalStateException.class,
() -> client().admin()
.indices()
.prepareResizeIndex("test", "target")
.setResizeType(ResizeType.SHRINK)
.setSettings(
Settings.builder()
.put("index.number_of_replicas", 1)
.putNull("index.blocks.write")
.putNull("index.routing.allocation.require._name")
.build()
)
.get()
);
assertEquals(
" For index [test] replica shards haven't caught up with primary, please retry after sometime.",
exception.getMessage()
);

}

}

public void testCreateSplitIndexWithSegmentReplicationBlocked() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);

// create index with -1 as refresh interval as we are blocking segrep and we want to control refreshes.
prepareCreate("test").setSettings(
Settings.builder()
.put(indexSettings())
.put("index.refresh_interval", -1)
.put("index.number_of_replicas", 1)
.put("number_of_shards", 3)
).get();

final Map<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(new DiscoveryNode[0]);
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below.
ensureGreen();

CountDownLatch latch = new CountDownLatch(1);

// block Segment Replication so that replicas never get the docs from primary
try (final Releasable ignored = blockReplication(List.of(discoveryNodes[0].getName()), latch)) {
final int docs = 500;
for (int i = 0; i < docs; i++) {
client().prepareIndex("test").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
}
refresh();
assertBusy(() -> {
assertHitCount(
client().prepareSearch("test")
.setQuery(new TermsQueryBuilder("foo", "bar"))
.setPreference(Preference.PRIMARY.type())
.get(),
docs
);
});

// block writes on index before performing split operation
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.blocks.write", true)).get();
ensureGreen();

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")
)
.get();

// Trigger split operation
assertAcked(
client().admin()
.indices()
.prepareResizeIndex("test", "target")
.setResizeType(ResizeType.SPLIT)
.setSettings(
Settings.builder()
.put("index.number_of_replicas", 1)
.put("index.number_of_shards", 6)
.putNull("index.blocks.write")
.build()
)
.get()
);
ensureGreen();

// verify that all docs are present in new target index
assertHitCount(
client().prepareSearch("target")
.setQuery(new TermsQueryBuilder("foo", "bar"))
.setPreference(Preference.PRIMARY.type())
.get(),
docs
);
}

}

public void testCloneIndex() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);

// create index with -1 as refresh interval as we are blocking segrep and we want to control refreshes.
prepareCreate("test").setSettings(
Settings.builder().put(indexSettings()).put("index.number_of_replicas", 1).put("number_of_shards", randomIntBetween(1, 5))
).get();

final Map<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(new DiscoveryNode[0]);
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
// to the require._name below.
ensureGreen();

CountDownLatch latch = new CountDownLatch(1);

// block Segment Replication so that replicas never get the docs from primary
try (final Releasable ignored = blockReplication(List.of(discoveryNodes[0].getName()), latch)) {
final int docs = 500;
for (int i = 0; i < docs; i++) {
client().prepareIndex("test").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
}
refresh();
assertBusy(() -> {
assertHitCount(
client().prepareSearch("test")
.setQuery(new TermsQueryBuilder("foo", "bar"))
.setPreference(Preference.PRIMARY.type())
.get(),
docs
);
});

// block writes on index before performing clone operation
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.blocks.write", true)).get();
ensureGreen();

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")
)
.get();

// Trigger split operation
assertAcked(
client().admin()
.indices()
.prepareResizeIndex("test", "target")
.setResizeType(ResizeType.CLONE)
.setSettings(Settings.builder().put("index.number_of_replicas", 1).putNull("index.blocks.write").build())
.get()
);
ensureGreen();

// verify that all docs are present in new target index
assertHitCount(
client().prepareSearch("target")
.setQuery(new TermsQueryBuilder("foo", "bar"))
.setPreference(Preference.PRIMARY.type())
.get(),
docs
);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import java.util.Set;
import java.util.function.IntFunction;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;

/**
* Main class to initiate resizing (shrink / split) an index into a new index
*
Expand Down Expand Up @@ -138,25 +140,78 @@ protected void clusterManagerOperation(
// there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code
final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
client.admin()
.indices()
.prepareStats(sourceIndex)
.clear()
.setDocs(true)
.setStore(true)
.execute(ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, i -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, indicesStatsResponse.getPrimaries().store, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest,
ActionListener.map(
delegatedListener,
response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index())
)
);
}));

IndexMetadata indexMetadata = state.metadata().index(sourceIndex);
if (resizeRequest.getResizeType().equals(ResizeType.SHRINK)
&& state.metadata().isSegmentReplicationEnabled(sourceIndex)
&& indexMetadata != null
&& Integer.valueOf(indexMetadata.getSettings().get(SETTING_NUMBER_OF_REPLICAS)) > 0) {
client.admin()
.indices()
.prepareRefresh(sourceIndex)
.execute(ActionListener.delegateFailure(listener, (delegatedRefreshListener, refreshResponse) -> {
client.admin()
.indices()
.prepareStats(sourceIndex)
.clear()
.setDocs(true)
.setStore(true)
.setSegments(true)
.execute(ActionListener.delegateFailure(listener, (delegatedIndicesStatsListener, indicesStatsResponse) -> {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, i -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, indicesStatsResponse.getPrimaries().store, sourceIndex, targetIndex);

if (indicesStatsResponse.getIndex(sourceIndex)
.getTotal()
.getSegments()
.getReplicationStats().maxBytesBehind != 0) {
throw new IllegalStateException(
" For index ["
+ sourceIndex
+ "] replica shards haven't caught up with primary, please retry after sometime."
);
}

createIndexService.createIndex(
updateRequest,
ActionListener.map(
delegatedIndicesStatsListener,
response -> new ResizeResponse(
response.isAcknowledged(),
response.isShardsAcknowledged(),
updateRequest.index()
)
)
);
}));
}));
} else {
client.admin()
.indices()
.prepareStats(sourceIndex)
.clear()
.setDocs(true)
.setStore(true)
.execute(ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state, i -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, indicesStatsResponse.getPrimaries().store, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest,
ActionListener.map(
delegatedListener,
response -> new ResizeResponse(
response.isAcknowledged(),
response.isShardsAcknowledged(),
updateRequest.index()
)
)
);
}));
}

}

Expand Down

0 comments on commit 9602d1d

Please sign in to comment.