Skip to content

Commit

Permalink
javadoc changes and more
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie authored and Ajay Kumar Movva committed Oct 24, 2023
1 parent 44b4f07 commit b095493
Show file tree
Hide file tree
Showing 21 changed files with 181 additions and 151 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
- GHA to verify checklist items completion in PR descriptions ([#10800](https://github.com/opensearch-project/OpenSearch/pull/10800))
- [AdmissionControl] Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats
- [Admission Control] Add changes to integrate CPU AC and ResourceUsageCollector with Stats ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ public NodeStats(StreamInput in) throws IOException {
} else {
repositoriesStats = null;
}
if(in.getVersion().onOrAfter(Version.V_3_0_0)) {
// TODO: change to V_2_12_0 on main after backport to 2.x
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new);
} else {
admissionControlStats = null;
Expand Down Expand Up @@ -503,6 +504,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(repositoriesStats);
}
// TODO: change to V_2_12_0 on main after backport to 2.x
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(admissionControlStats);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ protected TransportReplicationAction(

transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);

if(transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)){
// Register only TransportShardBulkAction for admission control ( primary indexing action )
if (transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)) {
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,16 +300,19 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
return actualHandler;
}

/**
* Intercept the transport action and perform admission control if applicable
*/
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
String action,
String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler,
AdmissionControlActionType transportActionType
AdmissionControlActionType admissionControlActionType
) {
for (TransportInterceptor interceptor : this.transportInterceptors) {
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, transportActionType);
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, admissionControlActionType);
}
return actualHandler;
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public NodeStats stats(
searchPipelineStats ? this.searchPipelineService.stats() : null,
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
admissionControl ? this.admissionControlService.stats(): null
admissionControl ? this.admissionControlService.stats() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.BaseAdmissionControllerStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.CPUBasedAdmissionControllerStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
Expand Down Expand Up @@ -47,8 +45,14 @@ public class AdmissionControlService {
* @param settings Immutable settings instance
* @param clusterService ClusterService Instance
* @param threadPool ThreadPool Instance
* @param resourceUsageCollectorService Instance used to get node resource usage stats
*/
public AdmissionControlService(Settings settings, ClusterService clusterService, ThreadPool threadPool, ResourceUsageCollectorService resourceUsageCollectorService) {
public AdmissionControlService(
Settings settings,
ClusterService clusterService,
ThreadPool threadPool,
ResourceUsageCollectorService resourceUsageCollectorService
) {
this.threadPool = threadPool;
this.admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings);
this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>();
Expand All @@ -68,11 +72,13 @@ private void initialise() {

/**
*
* @param action transport action that is being executed. we are using it for logging while request is rejected
* @param admissionControlActionType type of the admissionControllerActionType
* @param action Transport action name
* @param admissionControlActionType admissionControllerActionType value
*/
public void applyTransportAdmissionControl(String action, AdmissionControlActionType admissionControlActionType) {
this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action, admissionControlActionType); });
this.ADMISSION_CONTROLLERS.forEach(
(name, admissionController) -> { admissionController.apply(action, admissionControlActionType); }
);
}

/**
Expand All @@ -90,7 +96,12 @@ public void registerAdmissionController(String admissionControllerName) {
private AdmissionController controllerFactory(String admissionControllerName) {
switch (admissionControllerName) {
case CPU_BASED_ADMISSION_CONTROLLER:
return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterService, this.resourceUsageCollectorService);
return new CPUBasedAdmissionController(
admissionControllerName,
this.resourceUsageCollectorService,
this.clusterService,
this.settings
);
default:
throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName);
}
Expand All @@ -113,26 +124,18 @@ public AdmissionController getAdmissionController(String controllerName) {
return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null);
}

public AdmissionControlStats stats(){
List<BaseAdmissionControllerStats> statsList = new ArrayList<>();
if(this.ADMISSION_CONTROLLERS.size() > 0){
/**
* Return admission control stats
*/
public AdmissionControlStats stats() {
List<AdmissionControllerStats> statsList = new ArrayList<>();
if (this.ADMISSION_CONTROLLERS.size() > 0) {
this.ADMISSION_CONTROLLERS.forEach((controllerName, admissionController) -> {
BaseAdmissionControllerStats admissionControllerStats = controllerStatsFactory(admissionController);
if(admissionControllerStats != null) {
statsList.add(admissionControllerStats);
}
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController, controllerName);
statsList.add(admissionControllerStats);
});
return new AdmissionControlStats(statsList);
}
return null;
}

private BaseAdmissionControllerStats controllerStatsFactory(AdmissionController admissionController) {
switch (admissionController.getName()) {
case CPU_BASED_ADMISSION_CONTROLLER:
return new CPUBasedAdmissionControllerStats(admissionController);
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;

Expand All @@ -33,11 +32,17 @@ public abstract class AdmissionController {
public final ClusterService clusterService;

/**
* @param rejectionCount initialised rejectionCount value for AdmissionController
* @param admissionControllerName name of the admissionController
* @param admissionControllerName name of the admissionController
* @param resourceUsageCollectorService instance used to get resource usage stats of the node
* @param rejectionCount initialised rejectionCount value for AdmissionController
* @param clusterService
*/
public AdmissionController(AtomicLong rejectionCount, String admissionControllerName, ResourceUsageCollectorService resourceUsageCollectorService, ClusterService clusterService) {
public AdmissionController(
String admissionControllerName,
ResourceUsageCollectorService resourceUsageCollectorService,
AtomicLong rejectionCount,
ClusterService clusterService
) {
this.rejectionCount = rejectionCount;
this.admissionControllerName = admissionControllerName;
this.resourceUsageCollectorService = resourceUsageCollectorService;
Expand All @@ -62,8 +67,7 @@ public Boolean isAdmissionControllerEnforced(AdmissionControlMode admissionContr
}

/**
* Increment the tracking-objects and apply the admission control if threshold is breached.
* Mostly applicable while applying admission controller
* Apply admission control based on the resource usage for an action
*/
public abstract void apply(String action, AdmissionControlActionType admissionControlActionType);

Expand All @@ -74,9 +78,12 @@ public String getName() {
return this.admissionControllerName;
}

/**
* Add rejection count to the rejection count metric tracked by the admission controller
*/
public void addRejectionCount(String admissionControlActionType, long count) {
AtomicLong updatedCount = new AtomicLong(0);
if(this.rejectionCountMap.containsKey(admissionControlActionType)){
if (this.rejectionCountMap.containsKey(admissionControlActionType)) {
updatedCount.addAndGet(this.rejectionCountMap.get(admissionControlActionType).get());
}
updatedCount.addAndGet(count);
Expand All @@ -91,6 +98,9 @@ public long getRejectionCount(String admissionControlActionType) {
return rejectionCount.get();
}

/**
* Get rejection stats of the admission controller
*/
public Map<String, Long> getRejectionStats() {
Map<String, Long> rejectionStats = new HashMap<>();
rejectionCountMap.forEach((actionType, count) -> rejectionStats.put(actionType, count.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@
import org.opensearch.node.NodeResourceUsageStats;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -33,56 +30,89 @@ public class CPUBasedAdmissionController extends AdmissionController {
public CPUBasedAdmissionControllerSettings settings;

/**
*
* @param admissionControllerName State of the admission controller
* @param admissionControllerName Name of the admission controller
* @param resourceUsageCollectorService Instance used to get node resource usage stats
* @param clusterService ClusterService Instance
* @param settings Immutable settings instance
*/
public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterService clusterService, ResourceUsageCollectorService resourceUsageCollectorService) {
super(new AtomicLong(0), admissionControllerName, resourceUsageCollectorService, clusterService);
public CPUBasedAdmissionController(
String admissionControllerName,
ResourceUsageCollectorService resourceUsageCollectorService,
ClusterService clusterService,
Settings settings
) {
super(admissionControllerName, resourceUsageCollectorService, new AtomicLong(0), clusterService);
this.settings = new CPUBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings);
}

/**
* This function will take of applying admission controller based on CPU usage
* Apply admission control based on process CPU usage
* @param action is the transport action
*/
@Override
public void apply(String action, AdmissionControlActionType admissionControlActionType) {
// TODO Will extend this logic further currently just incrementing rejectionCount
if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) {
this.applyForTransportLayer(action, admissionControlActionType);
}
}

/**
* Apply transport layer admission control if configured limit has been reached
*/
private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) {
if (isLimitsBreached(admissionControlActionType)) {
if (isLimitsBreached(actionName, admissionControlActionType)) {
this.addRejectionCount(admissionControlActionType.getType(), 1);
if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) {
throw new OpenSearchRejectedExecutionException("Action ["+ actionName +"] was rejected due to CPU usage admission controller limit breached");
throw new OpenSearchRejectedExecutionException(
String.format("CPU usage admission controller limit reached for action [%s]", admissionControlActionType.name())
);
}
}
}

private boolean isLimitsBreached(AdmissionControlActionType transportActionType) {
long maxCpuLimit = this.getCpuRejectionThreshold(transportActionType);
Optional<NodeResourceUsageStats> nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(this.clusterService.state().nodes().getLocalNodeId());
if(nodePerformanceStatistics.isPresent()) {
double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent();
if (cpuUsage >= maxCpuLimit){
LOGGER.warn("CpuBasedAdmissionController rejected the request as the current CPU usage [" +
cpuUsage + "%] exceeds the allowed limit [" + maxCpuLimit + "%]");
return true;
/**
* Check if the configured resource usage limits are breached for the action
*/
private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) {
// check if cluster state is ready
if (clusterService.state() != null && clusterService.state().nodes() != null) {
long maxCpuLimit = this.getCpuRejectionThreshold(admissionControlActionType);
Optional<NodeResourceUsageStats> nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(
this.clusterService.state().nodes().getLocalNodeId()
);
if (nodePerformanceStatistics.isPresent()) {
double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent();
if (cpuUsage >= maxCpuLimit) {
LOGGER.warn(
"CpuBasedAdmissionController rejected the request as the current CPU "
+ "usage [{}] exceeds the allowed limit [{}] for transport action [{}]",
cpuUsage,
maxCpuLimit,
actionName
);
return true;
}
}
}
return false;
}
private long getCpuRejectionThreshold(AdmissionControlActionType transportActionType) {
switch (transportActionType) {

/**
* Get CPU rejection threshold based on action type
*/
private long getCpuRejectionThreshold(AdmissionControlActionType admissionControlActionType) {
switch (admissionControlActionType) {
case SEARCH:
return this.settings.getSearchCPULimit();
case INDEXING:
return this.settings.getIndexingCPULimit();
default:
throw new IllegalArgumentException("Not Supported TransportAction Type: " + transportActionType.getType());
throw new IllegalArgumentException(
String.format(
"Admission control not Supported for AdmissionControlActionType: %s",
admissionControlActionType.getType()
)
);
}
}
}
Loading

0 comments on commit b095493

Please sign in to comment.