Skip to content

Commit

Permalink
Add lower limit for primary and replica batch allocators timeout (#14979
Browse files Browse the repository at this point in the history
)

* Add lower limit for primary and replica batch allocators

Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN authored Jul 30, 2024
1 parent ffa67f9 commit 09276b3
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `actions/github-script` from 6 to 7 ([#14997](https://github.com/opensearch-project/OpenSearch/pull/14997))

### Changed
- Add lower limit for primary and replica batch allocators timeout ([#14979](https://github.com/opensearch-project/OpenSearch/pull/14979))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Ex
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws Exception {
public void testBatchModeEnabledWithDisabledTimeoutAndClusterGreen() throws Exception {

internalCluster().startClusterManagerOnlyNodes(
1,
Expand Down Expand Up @@ -920,8 +920,8 @@ public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws
.put("node.name", clusterManagerName)
.put(clusterManagerDataPathSettings)
.put(ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE.getKey(), 5)
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "-1")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "-1")
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.build()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
private final long maxBatchSize;
private static final short DEFAULT_SHARD_BATCH_SIZE = 2000;

private static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
public static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
"cluster.routing.allocation.shards_batch_gateway_allocator.primary_allocator_timeout";
private static final String REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
public static final String REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
"cluster.routing.allocation.shards_batch_gateway_allocator.replica_allocator_timeout";

private TimeValue primaryShardsBatchGatewayAllocatorTimeout;
private TimeValue replicaShardsBatchGatewayAllocatorTimeout;
public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);

/**
* Number of shards we send in one batch to data nodes for fetching metadata
Expand All @@ -92,16 +93,50 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
Setting.Property.NodeScope
);

/**
* Timeout for existing primary shards batch allocator.
* Timeout value must be greater than or equal to 20s or -1ms to effectively disable timeout
*/
public static final Setting<TimeValue> PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
TimeValue.MINUS_ONE,
TimeValue.MINUS_ONE,
new Setting.Validator<>() {
@Override
public void validate(TimeValue timeValue) {
if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) {
throw new IllegalArgumentException(
"Setting ["
+ PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey()
+ "] should be more than 20s or -1ms to disable timeout"
);
}
}
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Timeout for existing replica shards batch allocator.
* Timeout value must be greater than or equal to 20s or -1ms to effectively disable timeout
*/
public static final Setting<TimeValue> REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
TimeValue.MINUS_ONE,
TimeValue.MINUS_ONE,
new Setting.Validator<>() {
@Override
public void validate(TimeValue timeValue) {
if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) {
throw new IllegalArgumentException(
"Setting ["
+ REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey()
+ "] should be more than 20s or -1ms to disable timeout"
);
}
}
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING;
import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY;
import static org.opensearch.gateway.ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING;
import static org.opensearch.gateway.ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY;

public class GatewayAllocatorTests extends OpenSearchAllocationTestCase {

private final Logger logger = LogManager.getLogger(GatewayAllocatorTests.class);
Expand Down Expand Up @@ -368,6 +373,56 @@ public void testCreatePrimaryAndReplicaExecutorOfSizeTwo() {
assertEquals(executor.getTimeoutAwareRunnables().size(), 2);
}

public void testPrimaryAllocatorTimeout() {
// Valid setting with timeout = 20s
Settings build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "20s").build();
assertEquals(20, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Valid setting with timeout > 20s
build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "30000ms").build();
assertEquals(30, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Invalid setting with timeout < 20s
Settings lessThan20sSetting = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "10s").build();
IllegalArgumentException iae = expectThrows(
IllegalArgumentException.class,
() -> PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(lessThan20sSetting)
);
assertEquals(
"Setting [" + PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey() + "] should be more than 20s or -1ms to disable timeout",
iae.getMessage()
);

// Valid setting with timeout = -1
build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "-1").build();
assertEquals(-1, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis());
}

public void testReplicaAllocatorTimeout() {
// Valid setting with timeout = 20s
Settings build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "20s").build();
assertEquals(20, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Valid setting with timeout > 20s
build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "30000ms").build();
assertEquals(30, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Invalid setting with timeout < 20s
Settings lessThan20sSetting = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "10s").build();
IllegalArgumentException iae = expectThrows(
IllegalArgumentException.class,
() -> REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(lessThan20sSetting)
);
assertEquals(
"Setting [" + REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey() + "] should be more than 20s or -1ms to disable timeout",
iae.getMessage()
);

// Valid setting with timeout = -1
build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "-1").build();
assertEquals(-1, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis());
}

private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas) {
if (count == 0) return;
Metadata.Builder metadata = Metadata.builder();
Expand Down

0 comments on commit 09276b3

Please sign in to comment.