Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force merge API supports performing only on primary shards #11269

Merged
merged 21 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Request level coordinator slow logs ([#10650](https://github.com/opensearch-project/OpenSearch/pull/10650))
- Add template snippets support for field and target_field in KV ingest processor ([#10040](https://github.com/opensearch-project/OpenSearch/pull/10040))
- Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307))
- Force merge API supports performing on primary shards only ([#11269](https://github.com/opensearch-project/OpenSearch/pull/11269))
- Make number of segment metadata files in remote segment store configurable ([#11329](https://github.com/opensearch-project/OpenSearch/pull/11329))
- Allow changing number of replicas of searchable snapshot index ([#11317](https://github.com/opensearch-project/OpenSearch/pull/11317))
- Adding slf4j license header to LoggerMessageFormat.java ([#11069](https://github.com/opensearch-project/OpenSearch/pull/11069))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
"wait_for_completion": {
"type" : "boolean",
"description" : "If false, the request will return a task immediately and the operation will run in background. Defaults to true."
},
"primary_only": {
"type" : "boolean",
"description" : "Specify whether the operation should only perform on primary shards. Defaults to false."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,23 @@
index: test
max_num_segments: 10
only_expunge_deletes: true

---
"Test primary_only parameter":
- skip:
version: " - 2.99.99"
reason: "primary_only is available in 3.0+"

- do:
indices.create:
index: test
body:
settings:
index.number_of_shards: 2
index.number_of_replicas: 1

- do:
indices.forcemerge:
index: test
primary_only: true
- match: { _shards.total: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
wait_for_completion: true
task_id: $taskId
- match: { task.action: "indices:admin/forcemerge" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true]" }
- match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]" }

# .tasks index is created when the force-merge operation completes, so we should delete .tasks index finally,
# if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ public static final class Defaults {
public static final int MAX_NUM_SEGMENTS = -1;
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final boolean PRIMARY_ONLY = false;
}

private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private boolean primaryOnly = Defaults.PRIMARY_ONLY;

private static final Version FORCE_MERGE_UUID_VERSION = Version.V_3_0_0;

Expand All @@ -100,6 +102,9 @@ public ForceMergeRequest(StreamInput in) throws IOException {
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
primaryOnly = in.readBoolean();
}
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
forceMergeUUID = in.readString();
} else if ((forceMergeUUID = in.readOptionalString()) == null) {
Expand Down Expand Up @@ -166,6 +171,21 @@ public ForceMergeRequest flush(boolean flush) {
return this;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public boolean primaryOnly() {
return primaryOnly;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public ForceMergeRequest primaryOnly(boolean primaryOnly) {
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
this.primaryOnly = primaryOnly;
return this;
}

/**
* Should this task store its result after it has finished?
*/
Expand All @@ -188,6 +208,8 @@ public String getDescription() {
+ onlyExpungeDeletes
+ "], flush["
+ flush
+ "], primaryOnly["
+ primaryOnly
+ "]";
}

Expand All @@ -197,6 +219,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeBoolean(primaryOnly);
}
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
out.writeString(forceMergeUUID);
} else {
Expand All @@ -213,6 +238,8 @@ public String toString() {
+ onlyExpungeDeletes
+ ", flush="
+ flush
+ ", primaryOnly="
+ primaryOnly
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,12 @@
request.flush(flush);
return this;
}

/**
* Should force merge only performed on primary shards. Defaults to {@code false}.
*/
public ForceMergeRequestBuilder setPrimaryOnly(boolean primaryOnly) {
request.primaryOnly(primaryOnly);
return this;

Check warning on line 90 in server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestBuilder.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestBuilder.java#L89-L90

Added lines #L89 - L90 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,16 @@
}

/**
* The refresh request works against *all* shards.
* The force merge request works against *all* shards by default, but it can work against all primary shards only
* by setting primary_only to true.
*/
@Override
protected ShardsIterator shards(ClusterState clusterState, ForceMergeRequest request, String[] concreteIndices) {
return clusterState.routingTable().allShards(concreteIndices);
if (request.primaryOnly()) {
return clusterState.routingTable().allShardsSatisfyingPredicate(concreteIndices, ShardRouting::primary);

Check warning on line 124 in server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/indices/forcemerge/TransportForceMergeAction.java#L124

Added line #L124 was not covered by tests
} else {
return clusterState.routingTable().allShards(concreteIndices);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,16 @@ public ShardsIterator allShardsSatisfyingPredicate(Predicate<ShardRouting> predi
return allShardsSatisfyingPredicate(indices, predicate, false);
}

/**
* All the shards for the provided indices on the node which match the predicate
* @param indices indices to return all the shards.
* @param predicate condition to match
* @return iterator over shards matching the predicate for the specific indices
*/
public ShardsIterator allShardsSatisfyingPredicate(String[] indices, Predicate<ShardRouting> predicate) {
return allShardsSatisfyingPredicate(indices, predicate, false);
}

private ShardsIterator allShardsSatisfyingPredicate(
String[] indices,
Predicate<ShardRouting> predicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
mergeRequest.maxNumSegments(request.paramAsInt("max_num_segments", mergeRequest.maxNumSegments()));
mergeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", mergeRequest.onlyExpungeDeletes()));
mergeRequest.flush(request.paramAsBoolean("flush", mergeRequest.flush()));
mergeRequest.primaryOnly(request.paramAsBoolean("primary_only", mergeRequest.primaryOnly()));
if (mergeRequest.onlyExpungeDeletes() && mergeRequest.maxNumSegments() != ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS) {
deprecationLogger.deprecate(
"force_merge_expunge_deletes_and_max_num_segments_deprecation",
Expand Down
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,137 @@

package org.opensearch.action.admin.indices.forcemerge;

import org.opensearch.Version;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;

public class ForceMergeRequestTests extends OpenSearchTestCase {

public void testDescription() {
ForceMergeRequest request = new ForceMergeRequest();
assertEquals("Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
assertEquals(
"Force-merge indices [], maxSegments[-1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]",
request.getDescription()
);

request = new ForceMergeRequest("shop", "blog");
assertEquals("Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true]", request.getDescription());
assertEquals(
"Force-merge indices [shop, blog], maxSegments[-1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]",
request.getDescription()
);

request = new ForceMergeRequest();
request.maxNumSegments(12);
request.onlyExpungeDeletes(true);
request.flush(false);
assertEquals("Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false]", request.getDescription());
request.primaryOnly(true);
assertEquals(
"Force-merge indices [], maxSegments[12], onlyExpungeDeletes[true], flush[false], primaryOnly[true]",
request.getDescription()
);
}

public void testToString() {
ForceMergeRequest request = new ForceMergeRequest();
assertEquals("ForceMergeRequest{maxNumSegments=-1, onlyExpungeDeletes=false, flush=true, primaryOnly=false}", request.toString());

request = new ForceMergeRequest();
request.maxNumSegments(12);
request.onlyExpungeDeletes(true);
request.flush(false);
request.primaryOnly(true);
assertEquals("ForceMergeRequest{maxNumSegments=12, onlyExpungeDeletes=true, flush=false, primaryOnly=true}", request.toString());
}

public void testSerialization() throws Exception {
final ForceMergeRequest request = randomRequest();
try (BytesStreamOutput out = new BytesStreamOutput()) {
request.writeTo(out);

final ForceMergeRequest deserializedRequest;
try (StreamInput in = out.bytes().streamInput()) {
deserializedRequest = new ForceMergeRequest(in);
}
assertEquals(request.maxNumSegments(), deserializedRequest.maxNumSegments());
assertEquals(request.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
assertEquals(request.flush(), deserializedRequest.flush());
assertEquals(request.primaryOnly(), deserializedRequest.primaryOnly());
assertEquals(request.forceMergeUUID(), deserializedRequest.forceMergeUUID());
}
}

public void testBwcSerialization() throws Exception {
{
final ForceMergeRequest sample = randomRequest();
final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(compatibleVersion);
sample.writeTo(out);

final ForceMergeRequest deserializedRequest;
try (StreamInput in = out.bytes().streamInput()) {
in.setVersion(Version.CURRENT);
deserializedRequest = new ForceMergeRequest(in);
}

assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments());
assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
assertEquals(sample.flush(), deserializedRequest.flush());
if (compatibleVersion.onOrAfter(Version.V_2_12_0)) {
sohami marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly());
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID());
}
}
}
}

{
final ForceMergeRequest sample = randomRequest();
final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT);
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(Version.CURRENT);
sample.getParentTask().writeTo(out);
out.writeStringArray(sample.indices());
sample.indicesOptions().writeIndicesOptions(out);
out.writeInt(sample.maxNumSegments());
out.writeBoolean(sample.onlyExpungeDeletes());
out.writeBoolean(sample.flush());
if (compatibleVersion.onOrAfter(Version.V_2_12_0)) {
out.writeBoolean(sample.primaryOnly());
}
if (compatibleVersion.onOrAfter(Version.V_3_0_0)) {
out.writeString(sample.forceMergeUUID());
} else {
out.writeOptionalString(sample.forceMergeUUID());
}

final ForceMergeRequest deserializedRequest;
try (StreamInput in = out.bytes().streamInput()) {
in.setVersion(compatibleVersion);
deserializedRequest = new ForceMergeRequest(in);
}

assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments());
assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes());
assertEquals(sample.flush(), deserializedRequest.flush());
assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly());
assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID());

}
}
}

private ForceMergeRequest randomRequest() {
ForceMergeRequest request = new ForceMergeRequest();
if (randomBoolean()) {
request.maxNumSegments(randomIntBetween(1, 10));
}
request.onlyExpungeDeletes(true);
request.flush(randomBoolean());
request.primaryOnly(randomBoolean());
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,31 @@ public void testAllShardsMatchingPredicate() {
);
}

public void testAllShardsMatchingPredicateWithSpecificIndices() {
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test1")).addAsNew(metadata.index("test2")).build())
.build();
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")))
.build();
clusterState = allocation.reroute(clusterState, "reroute");

String[] indices = new String[] { "test1", "test2" };
// Verifies against all primary shards on the node
assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(indices, ShardRouting::primary).size(), is(2));
// Verifies against all replica shards on the node
assertThat(
clusterState.routingTable().allShardsSatisfyingPredicate(indices, shardRouting -> !shardRouting.primary()).size(),
is(2)
);
}

public void testActivePrimaryShardsGrouped() {
assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], true).size(), is(0));
assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], false).size(), is(0));
Expand Down
Loading