Skip to content

Commit

Permalink
fixed the PR comments and added Settings test
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Mar 13, 2024
1 parent 68968cf commit 8f3a5ea
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.junit.Before;

import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.CLUSTER_INFO_CPU_USAGE_LIMIT;
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -45,7 +45,7 @@ public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase {
private static final Settings ENFORCE_ADMISSION_CONTROL = Settings.builder()
.put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500))
.put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED)
.put(CLUSTER_INFO_CPU_USAGE_LIMIT.getKey(), 50)
.put(CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50)
.build();

@Before
Expand Down Expand Up @@ -92,7 +92,7 @@ public void testAdmissionControlEnforced() throws InterruptedException {
AdmissionControlService.class
);
AdmissionControllerStats admissionStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0);
assertEquals(admissionStats.rejectionCount.get(AdmissionControlActionType.CLUSTER_INFO.getType()).longValue(), 1);
assertEquals(admissionStats.rejectionCount.get(AdmissionControlActionType.CLUSTER_ADMIN.getType()).longValue(), 1);
assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()));
assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,26 +109,16 @@ protected HandledTransportAction(
) {
super(actionName, actionFilters, transportService.getTaskManager());

if (admissionControlActionType != null) {
transportService.registerRequestHandler(
actionName,
executor,
false,
canTripCircuitBreaker,
admissionControlActionType,
requestReader,
new TransportHandler()
);
} else {
transportService.registerRequestHandler(
actionName,
executor,
false,
canTripCircuitBreaker,
requestReader,
new TransportHandler()
);
}
transportService.registerRequestHandler(
actionName,
executor,
false,
canTripCircuitBreaker,
admissionControlActionType,
requestReader,
new TransportHandler()
);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected TransportClusterManagerNodeReadAction(
this(
actionName,
true,
AdmissionControlActionType.CLUSTER_INFO,
AdmissionControlActionType.CLUSTER_ADMIN,
transportService,
clusterService,
threadPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,11 +701,13 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,

// Admission Control Settings
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.CLUSTER_INFO_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,

// Concurrent segment search settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ private long getCpuRejectionThreshold(AdmissionControlActionType admissionContro
return this.settings.getSearchCPULimit();
case INDEXING:
return this.settings.getIndexingCPULimit();
case CLUSTER_INFO:
return this.settings.getClusterInfoCPULimit();
case CLUSTER_ADMIN:
return this.settings.getClusterAdminCPULimit();
default:
throw new IllegalArgumentException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
public enum AdmissionControlActionType {
INDEXING("indexing"),
SEARCH("search"),
CLUSTER_INFO("cluster_info");
CLUSTER_ADMIN("cluster_admin");

private final String type;

Expand All @@ -34,15 +34,12 @@ public String getType() {

public static AdmissionControlActionType fromName(String name) {
name = name.toLowerCase(Locale.ROOT);
switch (name) {
case "indexing":
return INDEXING;
case "search":
return SEARCH;
case "cluster_info":
return CLUSTER_INFO;
default:
throw new IllegalArgumentException("Not Supported TransportAction Type: " + name);

for (AdmissionControlActionType type : AdmissionControlActionType.values()) {
if (type.name().equals(name)) {
return type;
}
}
throw new IllegalArgumentException("Not Supported TransportAction Type: " + name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static class Defaults {
Setting.Property.NodeScope
);

public static final Setting<Long> CLUSTER_INFO_CPU_USAGE_LIMIT = Setting.longSetting(
public static final Setting<Long> CLUSTER_ADMIN_CPU_USAGE_LIMIT = Setting.longSetting(
"admission_control.cluster.admin.cpu_usage.limit",
Defaults.CPU_USAGE_LIMIT,
Setting.Property.Dynamic,
Expand All @@ -77,10 +77,10 @@ public CpuBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Sett
clusterSettings.addSettingsUpdateConsumer(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode);
this.searchCPULimit = SEARCH_CPU_USAGE_LIMIT.get(settings);
this.indexingCPULimit = INDEXING_CPU_USAGE_LIMIT.get(settings);
this.clusterInfoCPULimit = CLUSTER_INFO_CPU_USAGE_LIMIT.get(settings);
this.clusterInfoCPULimit = CLUSTER_ADMIN_CPU_USAGE_LIMIT.get(settings);
clusterSettings.addSettingsUpdateConsumer(INDEXING_CPU_USAGE_LIMIT, this::setIndexingCPULimit);
clusterSettings.addSettingsUpdateConsumer(SEARCH_CPU_USAGE_LIMIT, this::setSearchCPULimit);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_INFO_CPU_USAGE_LIMIT, this::setClusterInfoCPULimit);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ADMIN_CPU_USAGE_LIMIT, this::setClusterInfoCPULimit);

}

Expand All @@ -100,7 +100,7 @@ public Long getIndexingCPULimit() {
return indexingCPULimit;
}

public Long getClusterInfoCPULimit() {
public Long getClusterAdminCPULimit() {
return clusterInfoCPULimit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,11 @@ public <Request extends TransportRequest> void registerRequestHandler(
TransportRequestHandler<Request> handler
) {
validateActionName(action);
handler = interceptor.interceptHandler(action, executor, forceExecution, handler, admissionControlActionType);
if (admissionControlActionType != null) {
handler = interceptor.interceptHandler(action, executor, forceExecution, handler, admissionControlActionType);
} else {
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
}
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action,
requestReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public void testSettingsExists() {
Arrays.asList(
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT
)
)
);
Expand Down Expand Up @@ -149,4 +150,33 @@ public void testUpdateAfterGetConfiguredSettings() {
assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), searchPercent);
assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), indexingPercent);
}

public void testConfiguredSettingsForAdmin() {
Settings settings = Settings.builder()
.put(
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.ENFORCED.getMode()
)
.put(CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50)
.build();

CpuBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CpuBasedAdmissionControllerSettings(
clusterService.getClusterSettings(),
settings
);
assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED);
assertEquals(cpuBasedAdmissionControllerSettings.getClusterAdminCPULimit().longValue(), 50);

Settings updatedSettings = Settings.builder()
.put(
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.MONITOR.getMode()
)
.put(CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 90)
.build();
clusterService.getClusterSettings().applySettings(updatedSettings);
assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.MONITOR);
assertEquals(cpuBasedAdmissionControllerSettings.getClusterAdminCPULimit().longValue(), 90);

}
}

0 comments on commit 8f3a5ea

Please sign in to comment.