Skip to content

Commit

Permalink
Force merge API supports performing only on primary shards (opensearc…
Browse files Browse the repository at this point in the history
…h-project#11269)

* Force merge API supports performing on primary shards only

Signed-off-by: Gao Binlong <[email protected]>

* Modify change log

Signed-off-by: Gao Binlong <[email protected]>

* Fix test failure

Signed-off-by: Gao Binlong <[email protected]>

* Fix typo

Signed-off-by: Gao Binlong <[email protected]>

* Modify skip version

Signed-off-by: Gao Binlong <[email protected]>

* Add version check and more tests

Signed-off-by: Gao Binlong <[email protected]>

* Format code

Signed-off-by: Gao Binlong <[email protected]>

* Modify supported version and add more test

Signed-off-by: Gao Binlong <[email protected]>

* Change the supported version to 3.0.0

Signed-off-by: Gao Binlong <[email protected]>

* Add test case in SegmentReplicationIT

Signed-off-by: Gao Binlong <[email protected]>

* Optimize the test code

Signed-off-by: Gao Binlong <[email protected]>

---------

Signed-off-by: Gao Binlong <[email protected]>
  • Loading branch information
gaobinlong authored and rayshrey committed Mar 18, 2024
1 parent b4a77e0 commit 07f6a8e
Show file tree
Hide file tree
Showing 13 changed files with 260 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add toString methods to MultiSearchRequest, MultiGetRequest and CreateIndexRequest ([#12163](https://github.com/opensearch-project/OpenSearch/pull/12163))
- Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626))
- Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063))
- Force merge API supports performing on primary shards only ([#11269](https://github.com/opensearch-project/OpenSearch/pull/11269))
- [Tiered caching] Make IndicesRequestCache implementation configurable [EXPERIMENTAL] ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533))
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
"wait_for_completion": {
"type" : "boolean",
"description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true."
},
"primary_only": {
"type" : "boolean",
"description" : "Specify whether the operation should only perform on primary shards. Defaults to false."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,23 @@
index: test
max_num_segments: 10
only_expunge_deletes: true

---
"Test primary_only parameter":
- skip:
version: " - 2.99.99"
reason: "primary_only is available in 3.0+"

- do:
indices.create:
index: test
body:
settings:
index.number_of_shards: 2
index.number_of_replicas: 1

- do:
indices.forcemerge:
index: test
primary_only: true
- match: { _shards.total: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
wait_for_completion: true
task_id: $taskId
- match: { task.action: "indices:admin/forcemerge" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true]" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]" }

# .tasks index is created when the force-merge operation completes, so we should delete .tasks index finally,
# if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ public void testForceMergeUUIDConsistent() throws IOException {
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
}

public void testForceMergeOnlyOnPrimaryShards() throws IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
final String index = "test-index";
createIndex(
index,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen(index);
final ForceMergeResponse forceMergeResponse = client().admin()
.indices()
.prepareForceMerge(index)
.setMaxNumSegments(1)
.setPrimaryOnly(true)
.get();
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(1));
}

private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
try (GatedCloseable<IndexCommit> wrappedIndexCommit = indexShard.acquireLastIndexCommit(true)) {
return wrappedIndexCommit.get().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.get.GetResponse;
Expand Down Expand Up @@ -400,6 +401,14 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
performReplicationAfterForceMerge(false, SHARD_COUNT * (1 + REPLICA_COUNT));
}

public void testReplicationAfterForceMergeOnPrimaryShardsOnly() throws Exception {
performReplicationAfterForceMerge(true, SHARD_COUNT);
}

private void performReplicationAfterForceMerge(boolean primaryOnly, int expectedSuccessfulShards) throws Exception {
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -430,8 +439,16 @@ public void testReplicationAfterForceMerge() throws Exception {
waitForDocs(expectedHitCount, indexer);
waitForSearchableDocs(expectedHitCount, nodeA, nodeB);

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
// Perform force merge only on the primary shards.
final ForceMergeResponse forceMergeResponse = client().admin()
.indices()
.prepareForceMerge(INDEX_NAME)
.setPrimaryOnly(primaryOnly)
.setMaxNumSegments(1)
.setFlush(false)
.get();
assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
refresh(INDEX_NAME);
verifyStoreContent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ public static final class Defaults {
public static final int MAX_NUM_SEGMENTS = -1;
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final boolean PRIMARY_ONLY = false;
}

private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private boolean primaryOnly = Defaults.PRIMARY_ONLY;

private static final Version FORCE_MERGE_UUID_VERSION = Version.V_3_0_0;

Expand All @@ -100,6 +102,9 @@ public ForceMergeRequest(StreamInput in) throws IOException {
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
primaryOnly = in.readBoolean();
}
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
forceMergeUUID = in.readString();
} else if ((forceMergeUUID = in.readOptionalString()) == null) {
Expand Down Expand Up @@ -166,6 +171,21 @@ public ForceMergeRequest flush(boolean flush) {
return this;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public boolean primaryOnly() {
return primaryOnly;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public ForceMergeRequest primaryOnly(boolean primaryOnly) {
this.primaryOnly = primaryOnly;
return this;
}

/**
* Should this task store its result after it has finished?
*/
Expand All @@ -188,6 +208,8 @@ public String getDescription() {
+ onlyExpungeDeletes
+ "], flush["
+ flush
+ "], primaryOnly["
+ primaryOnly
+ "]";
}

Expand All @@ -197,6 +219,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(primaryOnly);
}
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
out.writeString(forceMergeUUID);
} else {
Expand All @@ -213,6 +238,8 @@ public String toString() {
+ onlyExpungeDeletes
+ ", flush="
+ flush
+ ", primaryOnly="
+ primaryOnly
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,12 @@ public ForceMergeRequestBuilder setFlush(boolean flush) {
request.flush(flush);
return this;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public ForceMergeRequestBuilder setPrimaryOnly(boolean primaryOnly) {
request.primaryOnly(primaryOnly);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,16 @@ protected EmptyResult shardOperation(ForceMergeRequest request, ShardRouting sha
}

/**
* The refresh request works against *all* shards.
* The force merge request works against *all* shards by default, but it can work against all primary shards only
* by setting primary_only to true.
*/
@Override
protected ShardsIterator shards(ClusterState clusterState, ForceMergeRequest request, String[] concreteIndices) {
return clusterState.routingTable().allShards(concreteIndices);
if (request.primaryOnly()) {
return clusterState.routingTable().allShardsSatisfyingPredicate(concreteIndices, ShardRouting::primary);
} else {
return clusterState.routingTable().allShards(concreteIndices);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,16 @@ public ShardsIterator allShardsSatisfyingPredicate(Predicate<ShardRouting> predi
return allShardsSatisfyingPredicate(indices, predicate, false);
}

/**
* All the shards for the provided indices on the node which match the predicate
* @param indices indices to return all the shards.
* @param predicate condition to match
* @return iterator over shards matching the predicate for the specific indices
*/
public ShardsIterator allShardsSatisfyingPredicate(String[] indices, Predicate<ShardRouting> predicate) {
return allShardsSatisfyingPredicate(indices, predicate, false);
}

private ShardsIterator allShardsSatisfyingPredicate(
String[] indices,
Predicate<ShardRouting> predicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
mergeRequest.maxNumSegments(request.paramAsInt("max_num_segments", mergeRequest.maxNumSegments()));
mergeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", mergeRequest.onlyExpungeDeletes()));
mergeRequest.flush(request.paramAsBoolean("flush", mergeRequest.flush()));
mergeRequest.primaryOnly(request.paramAsBoolean("primary_only", mergeRequest.primaryOnly()));
if (mergeRequest.onlyExpungeDeletes() && mergeRequest.maxNumSegments() != ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS) {
deprecationLogger.deprecate(
"force_merge_expunge_deletes_and_max_num_segments_deprecation",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,137 @@

package org.opensearch.action.admin.indices.forcemerge;

import org.opensearch.Version;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;

public class ForceMergeRequestTests extends OpenSearchTestCase {

public void testDescription() {
ForceMergeRequest request = new ForceMergeRequest();
assertEquals("Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
assertEquals(
"Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]",
request.getDescription()
);

request = new ForceMergeRequest("shop", "blog");
assertEquals("Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
assertEquals(
"Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]",
request.getDescription()
);

request = new ForceMergeRequest();
request.maxNumSegments(12);
request.onlyExpungeDeletes(true);
request.flush(false);
assertEquals("Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false]", request.getDescription());
request.primaryOnly(true);
assertEquals(
"Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false], primaryOnly[true]",
request.getDescription()
);
}

public void testToString() {
ForceMergeRequest request = new ForceMergeRequest();
assertEquals("ForceMergeRequest{maxNumSegments=-1, onlyExpungeDeletes=false, flush=true, primaryOnly=false}", request.toString());

request = new ForceMergeRequest();
request.maxNumSegments(12);
request.onlyExpungeDeletes(true);
request.flush(false);
request.primaryOnly(true);
assertEquals("ForceMergeRequest{maxNumSegments=12, onlyExpungeDeletes=true, flush=false, primaryOnly=true}", request.toString());
}

public void testSerialization() throws Exception {
final ForceMergeRequest request = randomRequest();
try (BytesStreamOutput out = new BytesStreamOutput()) {
request.writeTo(out);

final ForceMergeRequest deserializedRequest;
try (StreamInput in = out.bytes().streamInput()) {
deserializedRequest = new ForceMergeRequest(in);
}
assertEquals(request.maxNumSegments(), deserializedRequest.maxNumSegments());
assertEquals(request.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
assertEquals(request.flush(), deserializedRequest.flush());
assertEquals(request.primaryOnly(), deserializedRequest.primaryOnly());
assertEquals(request.forceMergeUUID(), deserializedRequest.forceMergeUUID());
}
}

public void testBwcSerialization() throws Exception {
{
final ForceMergeRequest sample = randomRequest();
final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(compatibleVersion);
sample.writeTo(out);

final ForceMergeRequest deserializedRequest;
try (StreamInput in = out.bytes().streamInput()) {
in.setVersion(Version.CURRENT);
deserializedRequest = new ForceMergeRequest(in);
}

assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments());
assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
assertEquals(sample.flush(), deserializedRequest.flush());
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly());
assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID());
}
}
}

{
final ForceMergeRequest sample = randomRequest();
final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(Version.CURRENT);
sample.getParentTask().writeTo(out);
out.writeStringArray(sample.indices());
sample.indicesOptions().writeIndicesOptions(out);
out.writeInt(sample.maxNumSegments());
out.writeBoolean(sample.onlyExpungeDeletes());
out.writeBoolean(sample.flush());
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(sample.primaryOnly());
}
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
out.writeString(sample.forceMergeUUID());
} else {
out.writeOptionalString(sample.forceMergeUUID());
}

final ForceMergeRequest deserializedRequest;
try (StreamInput in = out.bytes().streamInput()) {
in.setVersion(compatibleVersion);
deserializedRequest = new ForceMergeRequest(in);
}

assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments());
assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
assertEquals(sample.flush(), deserializedRequest.flush());
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly());
}
assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID());

}
}
}

private ForceMergeRequest randomRequest() {
ForceMergeRequest request = new ForceMergeRequest();
if (randomBoolean()) {
request.maxNumSegments(randomIntBetween(1, 10));
}
request.onlyExpungeDeletes(true);
request.flush(randomBoolean());
request.primaryOnly(randomBoolean());
return request;
}
}
Loading

0 comments on commit 07f6a8e

Please sign in to comment.