Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lower limit for primary and replica batch allocators timeout #14979

Merged
merged 7 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861))

### Changed
- Add lower limit for primary and replica batch allocators timeout ([#14979](https://github.com/opensearch-project/OpenSearch/pull/14979))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ public void testBatchModeEnabledWithSufficientTimeoutAndClusterGreen() throws Ex
assertEquals(0, gatewayAllocator.getNumberOfInFlightFetches());
}

public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws Exception {
public void testBatchModeEnabledWithDisabledTimeoutAndClusterGreen() throws Exception {

internalCluster().startClusterManagerOnlyNodes(
1,
Expand Down Expand Up @@ -889,8 +889,8 @@ public void testBatchModeEnabledWithInSufficientTimeoutButClusterGreen() throws
.put("node.name", clusterManagerName)
.put(clusterManagerDataPathSettings)
.put(ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE.getKey(), 5)
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "10ms")
.put(ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "-1")
.put(ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey(), "-1")
.put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true)
.build()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
private final long maxBatchSize;
private static final short DEFAULT_SHARD_BATCH_SIZE = 2000;

private static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
public static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
"cluster.routing.allocation.shards_batch_gateway_allocator.primary_allocator_timeout";
private static final String REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
public static final String REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
"cluster.routing.allocation.shards_batch_gateway_allocator.replica_allocator_timeout";

private TimeValue primaryShardsBatchGatewayAllocatorTimeout;
private TimeValue replicaShardsBatchGatewayAllocatorTimeout;
public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);

/**
* Number of shards we send in one batch to data nodes for fetching metadata
Expand All @@ -92,16 +93,50 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
Setting.Property.NodeScope
);

/**
* Timeout for existing primary shards batch allocator.
* Timeout value must be greater than or equal to 20s or -1ms to effectively disable timeout
*/
public static final Setting<TimeValue> PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
TimeValue.MINUS_ONE,
TimeValue.MINUS_ONE,
new Setting.Validator<>() {
@Override
public void validate(TimeValue timeValue) {
if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) {
throw new IllegalArgumentException(
"Setting ["
+ PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey()
+ "] should be more than 20s or -1ms to disable timeout"
);
}
}
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Timeout for existing replica shards batch allocator.
* Timeout value must be greater than or equal to 20s or -1ms to effectively disable timeout
*/
public static final Setting<TimeValue> REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting(
REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY,
TimeValue.MINUS_ONE,
TimeValue.MINUS_ONE,
new Setting.Validator<>() {
@Override
public void validate(TimeValue timeValue) {
if (timeValue.compareTo(MIN_ALLOCATOR_TIMEOUT) < 0 && timeValue.compareTo(TimeValue.MINUS_ONE) != 0) {
throw new IllegalArgumentException(
"Setting ["
+ REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey()
+ "] should be more than 20s or -1ms to disable timeout"
);
}
}
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING;
import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY;
import static org.opensearch.gateway.ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING;
import static org.opensearch.gateway.ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY;

public class GatewayAllocatorTests extends OpenSearchAllocationTestCase {

private final Logger logger = LogManager.getLogger(GatewayAllocatorTests.class);
Expand Down Expand Up @@ -368,6 +373,56 @@ public void testCreatePrimaryAndReplicaExecutorOfSizeTwo() {
assertEquals(executor.getTimeoutAwareRunnables().size(), 2);
}

public void testPrimaryAllocatorTimeout() {
// Valid setting with timeout = 20s
Settings build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "20s").build();
assertEquals(20, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Valid setting with timeout > 20s
build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "30000ms").build();
assertEquals(30, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Invalid setting with timeout < 20s
Settings lessThan20sSetting = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "10s").build();
IllegalArgumentException iae = expectThrows(
IllegalArgumentException.class,
() -> PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(lessThan20sSetting)
);
assertEquals(
"Setting [" + PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey() + "] should be more than 20s or -1ms to disable timeout",
iae.getMessage()
);

// Valid setting with timeout = -1
build = Settings.builder().put(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "-1").build();
assertEquals(-1, PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis());
}

public void testReplicaAllocatorTimeout() {
// Valid setting with timeout = 20s
Settings build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "20s").build();
assertEquals(20, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Valid setting with timeout > 20s
build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "30000ms").build();
assertEquals(30, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getSeconds());

// Invalid setting with timeout < 20s
Settings lessThan20sSetting = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "10s").build();
IllegalArgumentException iae = expectThrows(
IllegalArgumentException.class,
() -> REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(lessThan20sSetting)
);
assertEquals(
"Setting [" + REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.getKey() + "] should be more than 20s or -1ms to disable timeout",
iae.getMessage()
);

// Valid setting with timeout = -1
build = Settings.builder().put(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, "-1").build();
assertEquals(-1, REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis());
}

private void createIndexAndUpdateClusterState(int count, int numberOfShards, int numberOfReplicas) {
if (count == 0) return;
Metadata.Builder metadata = Metadata.builder();
Expand Down
Loading