Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add buffer for allocation constraints and a switch to disable index b… #14515

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Add allocation constraints buffer and setting to enable index balance constraint ([#14515](https://github.com/opensearch-project/OpenSearch/pull/14515))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@
public class AllocationConstraints {
private Map<String, Constraint> constraints;

public AllocationConstraints() {
public AllocationConstraints(AllocationParameter allocationParameter) {
this.constraints = new HashMap<>();
this.constraints.put(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f)));
this.constraints.put(
INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID,
new Constraint(isIndexShardsPerNodeBreached(allocationParameter.getShardBalanceBuffer()))
);
this.constraints.put(
INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID,
new Constraint(isPerIndexPrimaryShardsPerNodeBreached(allocationParameter.getPrimaryBalanceIndexBuffer()))
);
this.constraints.put(
CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID,
new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPrimaryBalanceShardBuffer()))
);
}

public void updateAllocationConstraint(String constraint, boolean enable) {
Expand All @@ -43,4 +52,8 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
return params.weight(constraints);
}

Constraint getAllocationConstraint(String name) {
return constraints.get(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

/**
* Allocation Constraint Parameters
*/
public class AllocationParameter {
private final float primaryBalanceShardBuffer;

private final float primaryBalanceIndexBuffer;

private final float shardBalanceBuffer;

public AllocationParameter(float shardBalanceBuffer, float primaryBalanceIndexBuffer, float primaryBalanceShardBuffer) {
this.shardBalanceBuffer = shardBalanceBuffer;
this.primaryBalanceShardBuffer = primaryBalanceShardBuffer;
this.primaryBalanceIndexBuffer = primaryBalanceIndexBuffer;
}

public float getPrimaryBalanceShardBuffer() {
return primaryBalanceShardBuffer;
}

public float getPrimaryBalanceIndexBuffer() {
return primaryBalanceIndexBuffer;
}

public float getShardBalanceBuffer() {
return shardBalanceBuffer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public void setEnable(boolean enable) {
this.enable = enable;
}

boolean isEnable() {
return enable;
}

static class ConstraintParams {
private ShardsBalancer balancer;
private BalancedShardsAllocator.ModelNode node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ public class ConstraintTypes {
public final static long CONSTRAINT_WEIGHT = 1000000L;

/**
* Defines per index constraint which is breached when a node contains more than avg number of primary shards for an index
* Defines per index constraint which is breached when a node contains more than avg number of primary shards for an index with added buffer
*/
public final static String INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "index.primary.shard.balance.constraint";

/**
* Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices
* Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices with added buffer
*/
public final static String CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint";

/**
* Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices
* Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices with added buffer
*/
public final static String CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID = "cluster.primary.shard.rebalance.constraint";

/**
* Defines an index constraint which is breached when a node contains more than avg number of shards for an index
* Defines an index constraint which is breached when a node contains more than avg number of shards for an index with added buffer
*/
public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.count.constraint";

Expand All @@ -50,27 +50,30 @@ public class ConstraintTypes {
* on one node, often resulting in a hotspot on that node.
* <p>
* This constraint is breached when balancer attempts to allocate more than
* average shards per index per node.
* average shards per index per node with added buffer.
*/
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached() {
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached(float buffer) {
return (params) -> {
int currIndexShardsOnNode = params.getNode().numShards(params.getIndex());
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()));
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()) * (1 + buffer));
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
};
}

/**
* Defines a predicate which returns true when specific to an index, a node contains more than average number of primary
* shards. This constraint is used in weight calculation during allocation and rebalancing. When breached a high weight
* shards with added buffer. This constraint is used in weight calculation during allocation and rebalancing. When breached a high weight
* {@link ConstraintTypes#CONSTRAINT_WEIGHT} is assigned to node resulting in lesser chances of node being selected
* as allocation or rebalancing target
*/
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
public static Predicate<Constraint.ConstraintParams> isPerIndexPrimaryShardsPerNodeBreached() {
public static Predicate<Constraint.ConstraintParams> isPerIndexPrimaryShardsPerNodeBreached(float buffer) {
return (params) -> {
int perIndexPrimaryShardCount = params.getNode().numPrimaryShards(params.getIndex());
int perIndexAllowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()));
return perIndexPrimaryShardCount > perIndexAllowedPrimaryShardCount;
int perIndexAllowedPrimaryShardCount = (int) Math.ceil(
params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()) * (1 + buffer)
);

return perIndexPrimaryShardCount >= perIndexAllowedPrimaryShardCount;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class RebalanceConstraints {

public RebalanceConstraints(RebalanceParameter rebalanceParameter) {
this.constraints = new HashMap<>();
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached(0.0f)));
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
this.constraints.put(
CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID,
new Constraint(isPrimaryShardsPerNodeBreached(rebalanceParameter.getPreferPrimaryBalanceBuffer()))
Expand Down
Loading
Loading