Skip to content

Commit

Permalink
[Backport 2.x] Force merge API supports performing only on primary sh…
Browse files Browse the repository at this point in the history
…ards (#11269) (#12609)

* Force merge API supports performing only on primary shards (#11269)

* Force merge API supports performing on primary shards only

Signed-off-by: Gao Binlong <[email protected]>

* Modify change log

Signed-off-by: Gao Binlong <[email protected]>

* Fix test failure

Signed-off-by: Gao Binlong <[email protected]>

* Fix typo

Signed-off-by: Gao Binlong <[email protected]>

* Modify skip version

Signed-off-by: Gao Binlong <[email protected]>

* Add version check and more tests

Signed-off-by: Gao Binlong <[email protected]>

* Format code

Signed-off-by: Gao Binlong <[email protected]>

* Modify supported version and add more test

Signed-off-by: Gao Binlong <[email protected]>

* Change the supported version to 3.0.0

Signed-off-by: Gao Binlong <[email protected]>

* Add test case in SegmentReplicationIT

Signed-off-by: Gao Binlong <[email protected]>

* Optimize the test code

Signed-off-by: Gao Binlong <[email protected]>

---------

Signed-off-by: Gao Binlong <[email protected]>
(cherry picked from commit e6eec36)

* Modify supported version to 2.13.0

Signed-off-by: Gao Binlong <[email protected]>

* Fix test failure

Signed-off-by: Gao Binlong <[email protected]>

* Fix yml test failure

Signed-off-by: Gao Binlong <[email protected]>

* Fix yaml test failure

Signed-off-by: Gao Binlong <[email protected]>

* Fix typo and test failure

Signed-off-by: Gao Binlong <[email protected]>

---------

Signed-off-by: Gao Binlong <[email protected]>
  • Loading branch information
gaobinlong authored Mar 13, 2024
1 parent 5bb0078 commit 0dfc846
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix error in RemoteSegmentStoreDirectory when debug logging is enabled ([#12328](https://github.com/opensearch-project/OpenSearch/pull/12328))
- Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626))
- Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063))
- Force merge API supports performing on primary shards only ([#11269](https://github.com/opensearch-project/OpenSearch/pull/11269))
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
"wait_for_completion": {
"type" : "boolean",
"description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true."
},
"primary_only": {
"type" : "boolean",
"description" : "Specify whether the operation should only perform on primary shards. Defaults to false."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,23 @@
index: test
max_num_segments: 10
only_expunge_deletes: true

---
"Test primary_only parameter":
- skip:
version: " - 2.12.99"
reason: "primary_only is available in 2.13.0+"

- do:
indices.create:
index: test
body:
settings:
index.number_of_shards: 2
index.number_of_replicas: 1

- do:
indices.forcemerge:
index: test
primary_only: true
- match: { _shards.total: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
# will return a task immediately and the merge process will run in background.

- skip:
version: " - 2.6.99"
reason: "only available in 2.7+"
features: allowed_warnings
version: " - 2.6.99, 2.13.0 - "
reason: "wait_for_completion was introduced in 2.7.0 and task description was changed in 2.13.0"
features: allowed_warnings, node_selector

- do:
indices.create:
index: test_index

- do:
node_selector:
version: " 2.7.0 - 2.12.99"
indices.forcemerge:
index: test_index
wait_for_completion: false
Expand All @@ -27,6 +29,30 @@
- match: { task.action: "indices:admin/forcemerge" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true]" }

---
"Force merge index with wait_for_completion after task description changed":
- skip:
version: " - 2.12.99 "
reason: "task description was changed in 2.13.0"
features: allowed_warnings, node_selector

- do:
node_selector:
version: " 2.13.0 - "
indices.forcemerge:
index: test_index
wait_for_completion: false
max_num_segments: 1
- match: { task: /^.+$/ }
- set: { task: taskId }

- do:
tasks.get:
wait_for_completion: true
task_id: $taskId
- match: { task.action: "indices:admin/forcemerge" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]" }

# .tasks index is created when the force-merge operation completes, so we should delete .tasks index finally,
# if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail.
# Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ public void testForceMergeUUIDConsistent() throws IOException {
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
}

public void testForceMergeOnlyOnPrimaryShards() throws IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
final String index = "test-index";
createIndex(
index,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen(index);
final ForceMergeResponse forceMergeResponse = client().admin()
.indices()
.prepareForceMerge(index)
.setMaxNumSegments(1)
.setPrimaryOnly(true)
.get();
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(1));
}

private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
try (GatedCloseable<IndexCommit> wrappedIndexCommit = indexShard.acquireLastIndexCommit(true)) {
return wrappedIndexCommit.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.get.GetResponse;
Expand Down Expand Up @@ -400,6 +401,14 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
performReplicationAfterForceMerge(false, SHARD_COUNT * (1 + REPLICA_COUNT));
}

public void testReplicationAfterForceMergeOnPrimaryShardsOnly() throws Exception {
performReplicationAfterForceMerge(true, SHARD_COUNT);
}

private void performReplicationAfterForceMerge(boolean primaryOnly, int expectedSuccessfulShards) throws Exception {
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -430,8 +439,16 @@ public void testReplicationAfterForceMerge() throws Exception {
waitForDocs(expectedHitCount, indexer);
waitForSearchableDocs(expectedHitCount, nodeA, nodeB);

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
// Perform force merge only on the primary shards.
final ForceMergeResponse forceMergeResponse = client().admin()
.indices()
.prepareForceMerge(INDEX_NAME)
.setPrimaryOnly(primaryOnly)
.setMaxNumSegments(1)
.setFlush(false)
.get();
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
refresh(INDEX_NAME);
verifyStoreContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ public static final class Defaults {
public static final int MAX_NUM_SEGMENTS = -1;
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final boolean PRIMARY_ONLY = false;
}

private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private boolean primaryOnly = Defaults.PRIMARY_ONLY;

private static final Version FORCE_MERGE_UUID_VERSION = LegacyESVersion.V_7_7_0;

Expand Down Expand Up @@ -102,6 +104,9 @@ public ForceMergeRequest(StreamInput in) throws IOException {
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_2_13_0)) {
primaryOnly = in.readBoolean();
}
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
forceMergeUUID = in.readOptionalString();
} else {
Expand Down Expand Up @@ -167,6 +172,21 @@ public ForceMergeRequest flush(boolean flush) {
return this;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public boolean primaryOnly() {
return primaryOnly;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public ForceMergeRequest primaryOnly(boolean primaryOnly) {
this.primaryOnly = primaryOnly;
return this;
}

/**
* Should this task store its result after it has finished?
*/
Expand All @@ -189,6 +209,8 @@ public String getDescription() {
+ onlyExpungeDeletes
+ "], flush["
+ flush
+ "], primaryOnly["
+ primaryOnly
+ "]";
}

Expand All @@ -198,6 +220,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_2_13_0)) {
out.writeBoolean(primaryOnly);
}
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
out.writeOptionalString(forceMergeUUID);
}
Expand All @@ -212,6 +237,8 @@ public String toString() {
+ onlyExpungeDeletes
+ ", flush="
+ flush
+ ", primaryOnly="
+ primaryOnly
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,12 @@ public ForceMergeRequestBuilder setFlush(boolean flush) {
request.flush(flush);
return this;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public ForceMergeRequestBuilder setPrimaryOnly(boolean primaryOnly) {
request.primaryOnly(primaryOnly);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,16 @@ protected EmptyResult shardOperation(ForceMergeRequest request, ShardRouting sha
}

/**
* The refresh request works against *all* shards.
* The force merge request works against *all* shards by default, but it can work against all primary shards only
* by setting primary_only to true.
*/
@Override
protected ShardsIterator shards(ClusterState clusterState, ForceMergeRequest request, String[] concreteIndices) {
return clusterState.routingTable().allShards(concreteIndices);
if (request.primaryOnly()) {
return clusterState.routingTable().allShardsSatisfyingPredicate(concreteIndices, ShardRouting::primary);
} else {
return clusterState.routingTable().allShards(concreteIndices);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,16 @@ public ShardsIterator allShardsSatisfyingPredicate(Predicate<ShardRouting> predi
return allShardsSatisfyingPredicate(indices, predicate, false);
}

/**
* All the shards for the provided indices on the node which match the predicate
* @param indices indices to return all the shards.
* @param predicate condition to match
* @return iterator over shards matching the predicate for the specific indices
*/
public ShardsIterator allShardsSatisfyingPredicate(String[] indices, Predicate<ShardRouting> predicate) {
return allShardsSatisfyingPredicate(indices, predicate, false);
}

private ShardsIterator allShardsSatisfyingPredicate(
String[] indices,
Predicate<ShardRouting> predicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
mergeRequest.maxNumSegments(request.paramAsInt("max_num_segments", mergeRequest.maxNumSegments()));
mergeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", mergeRequest.onlyExpungeDeletes()));
mergeRequest.flush(request.paramAsBoolean("flush", mergeRequest.flush()));
mergeRequest.primaryOnly(request.paramAsBoolean("primary_only", mergeRequest.primaryOnly()));
if (mergeRequest.onlyExpungeDeletes() && mergeRequest.maxNumSegments() != ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS) {
deprecationLogger.deprecate(
"force_merge_expunge_deletes_and_max_num_segments_deprecation",
Expand Down
Loading

0 comments on commit 0dfc846

Please sign in to comment.