Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Jun 24, 2024
1 parent cf053d2 commit 2bcc876
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Add allocation constraints buffer and setting to enable index balance constraint ([#14515](https://github.com/opensearch-project/OpenSearch/pull/14515))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ public AllocationConstraints(AllocationParameter allocationParameter) {
);
this.constraints.put(
INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID,
new Constraint(isPerIndexPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceIndexBuffer()))
new Constraint(isPerIndexPrimaryShardsPerNodeBreached(allocationParameter.getPrimaryBalanceIndexBuffer()))
);
this.constraints.put(
CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID,
new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceShardBuffer()))
new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPrimaryBalanceShardBuffer()))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@
* Allocation Constraint Parameters
*/
public class AllocationParameter {
private float preferPrimaryBalanceShardBuffer;
private final float primaryBalanceShardBuffer;

private float preferPrimaryBalanceIndexBuffer;
private final float primaryBalanceIndexBuffer;

private float shardBalanceBuffer;
private final float shardBalanceBuffer;

public AllocationParameter(float shardBalanceBuffer, float preferPrimaryBalanceIndexBuffer, float preferPrimaryBalanceShardBuffer) {
this.preferPrimaryBalanceShardBuffer = preferPrimaryBalanceShardBuffer;
this.preferPrimaryBalanceIndexBuffer = preferPrimaryBalanceIndexBuffer;
public AllocationParameter(float shardBalanceBuffer, float primaryBalanceIndexBuffer, float primaryBalanceShardBuffer) {
this.shardBalanceBuffer = shardBalanceBuffer;
this.primaryBalanceShardBuffer = primaryBalanceShardBuffer;
this.primaryBalanceIndexBuffer = primaryBalanceIndexBuffer;
}

public float getPreferPrimaryBalanceShardBuffer() {
return preferPrimaryBalanceShardBuffer;
public float getPrimaryBalanceShardBuffer() {
return primaryBalanceShardBuffer;
}

public float getPreferPrimaryBalanceIndexBuffer() {
return preferPrimaryBalanceIndexBuffer;
public float getPrimaryBalanceIndexBuffer() {
return primaryBalanceIndexBuffer;
}

public float getShardBalanceBuffer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.util.Set;

import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID;

Expand Down Expand Up @@ -149,7 +148,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
);

public static final Setting<Boolean> SHARDS_PER_INDEX_BALANCE = Setting.boolSetting(
"cluster.routing.allocation.balance.constraint.shard.enable",
"cluster.routing.allocation.balance.constraint.index.enable",
true,
Property.Dynamic,
Property.NodeScope
Expand All @@ -162,15 +161,15 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<Float> PREFER_PRIMARY_SHARD_BALANCE_BUFFER = Setting.floatSetting(
public static final Setting<Float> PRIMARY_SHARD_BALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.balance.primary.shard.buffer",
0.0f,
0.0f,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> PREFER_PRIMARY_INDEX_BALANCE_BUFFER = Setting.floatSetting(
public static final Setting<Float> PRIMARY_INDEX_BALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.balance.primary.index.buffer",
0.0f,
0.0f,
Expand All @@ -179,7 +178,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
);

public static final Setting<Float> SHARDS_PER_INDEX_BALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.balance.constraint.shard.buffer",
"cluster.routing.allocation.balance.constraint.index.buffer",
0.0f,
0.0f,
Property.Dynamic,
Expand All @@ -202,9 +201,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile boolean shardsPerIndexBalance;
private volatile boolean preferPrimaryShardRebalance;

private volatile float preferPrimaryBalanceShardBuffer;

private volatile float preferPrimaryBalanceIndexBuffer;
private volatile float primaryBalanceShardBuffer;
private volatile float primaryBalanceIndexBuffer;

private volatile float shardsPerIndexBalanceBuffer;
private volatile float preferPrimaryShardRebalanceBuffer;
Expand All @@ -219,26 +217,27 @@ public BalancedShardsAllocator(Settings settings) {

@Inject
public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) {
setThreshold(THRESHOLD_SETTING.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings));
setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings));
setPreferPrimaryBalanceShardBuffer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings));
setPreferPrimaryBalanceIndexBuffer(PREFER_PRIMARY_INDEX_BALANCE_BUFFER.get(settings));
setPrimaryBalanceShardBuffer(PRIMARY_SHARD_BALANCE_BUFFER.get(settings));
setPrimaryBalanceIndexBuffer(PRIMARY_INDEX_BALANCE_BUFFER.get(settings));
setShardsPerIndexBalanceBuffer(SHARDS_PER_INDEX_BALANCE_BUFFER.get(settings));
setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings));
updateWeightFunction();
setThreshold(THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
setShardsPerIndexBalance(SHARDS_PER_INDEX_BALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
updateWeightFunction();

clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE, this::setShardsPerIndexBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::updateIndexBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::updateShardBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER, this::setPreferPrimaryBalanceShardBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_INDEX_BALANCE_BUFFER, this::setPreferPrimaryBalanceIndexBuffer);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_BALANCE_BUFFER, this::setPrimaryBalanceShardBuffer);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_INDEX_BALANCE_BUFFER, this::setPrimaryBalanceIndexBuffer);
clusterSettings.addSettingsUpdateConsumer(SHARDS_PER_INDEX_BALANCE_BUFFER, this::setShardsPerIndexBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
Expand Down Expand Up @@ -276,6 +275,7 @@ private void setShardBalanceFactor(float shardBalanceFactor) {

private void setPreferPrimaryShardRebalanceBuffer(float preferPrimaryShardRebalanceBuffer) {
this.preferPrimaryShardRebalanceBuffer = preferPrimaryShardRebalanceBuffer;
updateWeightFunction();
}

private void updateIndexBalanceFactor(float indexBalanceFactor) {
Expand All @@ -298,10 +298,14 @@ private void updateWeightFunction() {
this.indexBalanceFactor,
this.shardBalanceFactor,
this.shardsPerIndexBalanceBuffer,
this.preferPrimaryBalanceIndexBuffer,
this.preferPrimaryBalanceShardBuffer,
this.primaryBalanceIndexBuffer,
this.primaryBalanceShardBuffer,
this.preferPrimaryShardRebalanceBuffer
);
this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, shardsPerIndexBalance);
this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
}

/**
Expand All @@ -310,23 +314,21 @@ private void updateWeightFunction() {
*/
private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) {
this.preferPrimaryShardBalance = preferPrimaryShardBalance;
this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
updateWeightFunction();
}

private void setShardsPerIndexBalance(boolean shardsPerIndexBalance) {
this.shardsPerIndexBalance = shardsPerIndexBalance;
this.weightFunction.updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, shardsPerIndexBalance);
updateWeightFunction();
}

private void setPreferPrimaryBalanceShardBuffer(float preferPrimaryBalanceShardBuffer) {
this.preferPrimaryBalanceShardBuffer = preferPrimaryBalanceShardBuffer;
private void setPrimaryBalanceShardBuffer(float primaryBalanceShardBuffer) {
this.primaryBalanceShardBuffer = primaryBalanceShardBuffer;
updateWeightFunction();
}

private void setPreferPrimaryBalanceIndexBuffer(float preferPrimaryBalanceIndexBuffer) {
this.preferPrimaryBalanceIndexBuffer = preferPrimaryBalanceIndexBuffer;
private void setPrimaryBalanceIndexBuffer(float primaryBalanceIndexBuffer) {
this.primaryBalanceIndexBuffer = primaryBalanceIndexBuffer;
updateWeightFunction();
}

Expand All @@ -337,7 +339,7 @@ private void setShardsPerIndexBalanceBuffer(float shardsPerIndexBalanceBuffer) {

private void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) {
this.preferPrimaryShardRebalance = preferPrimaryShardRebalance;
this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance);
updateWeightFunction();
}

private void setThreshold(float threshold) {
Expand Down Expand Up @@ -493,8 +495,8 @@ static class WeightFunction {
float indexBalance,
float shardBalance,
float shardsPerIndexBalanceBuffer,
float preferPrimaryBalanceIndexBuffer,
float preferPrimaryBalanceShardBuffer,
float primaryBalanceIndexBuffer,
float primaryBalanceShardBuffer,
float preferPrimaryBalanceBuffer
) {
float sum = indexBalance + shardBalance;
Expand All @@ -505,14 +507,14 @@ static class WeightFunction {
theta0 = 0;
theta1 = 0;
} else {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
throw new IllegalArgumentException("Balance factors must sum to a value >= 0 but was: " + sum);
}
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
AllocationParameter allocationParameter = new AllocationParameter(
shardsPerIndexBalanceBuffer,
preferPrimaryBalanceIndexBuffer,
preferPrimaryBalanceShardBuffer
primaryBalanceIndexBuffer,
primaryBalanceShardBuffer
);
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer);
this.constraints = new AllocationConstraints(allocationParameter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER,
BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE,
BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE,
BalancedShardsAllocator.PREFER_PRIMARY_INDEX_BALANCE_BUFFER,
BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE_BUFFER,
BalancedShardsAllocator.PRIMARY_INDEX_BALANCE_BUFFER,
BalancedShardsAllocator.PRIMARY_SHARD_BALANCE_BUFFER,
BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE,
BalancedShardsAllocator.SHARDS_PER_INDEX_BALANCE_BUFFER,
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
Expand Down

0 comments on commit 2bcc876

Please sign in to comment.