Skip to content

Commit

Permalink
Added changes to integrade cpu AC to ResourceUsageCollector and Emit …
Browse files Browse the repository at this point in the history
…Stats
  • Loading branch information
Ajay Kumar Movva committed Oct 19, 2023
1 parent 2759b70 commit aab2010
Show file tree
Hide file tree
Showing 46 changed files with 546 additions and 1,839 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.NodesResourceUsageStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
Expand Down Expand Up @@ -146,6 +147,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private NodesResourceUsageStats resourceUsageStats;

@Nullable
private AdmissionControlStats admissionControlStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -207,6 +211,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
resourceUsageStats = null;
}
if(in.getVersion().onOrAfter(Version.V_3_0_0)) {
admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new);
} else {
admissionControlStats = null;
}
}

public NodeStats(
Expand Down Expand Up @@ -234,7 +243,8 @@ public NodeStats(
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats
@Nullable SearchPipelineStats searchPipelineStats,
@Nullable AdmissionControlStats admissionControlStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -261,6 +271,7 @@ public NodeStats(
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
this.searchPipelineStats = searchPipelineStats;
this.admissionControlStats = admissionControlStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -403,6 +414,11 @@ public SearchPipelineStats getSearchPipelineStats() {
return searchPipelineStats;
}

@Nullable
public AdmissionControlStats getAdmissionControlStats() {
return admissionControlStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -449,6 +465,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport
out.writeOptionalWriteable(resourceUsageStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(admissionControlStats);
}
}

@Override
Expand Down Expand Up @@ -542,6 +561,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getResourceUsageStats() != null) {
getResourceUsageStats().toXContent(builder, params);
}
if (getAdmissionControlStats() != null) {
getAdmissionControlStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ public enum Metric {
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline"),
RESOURCE_USAGE_STATS("resource_usage_stats");
RESOURCE_USAGE_STATS("resource_usage_stats"),
ADMISSION_CONTROL("admission_control");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchService;
import org.opensearch.search.dfs.DfsSearchResult;
Expand Down Expand Up @@ -542,6 +543,9 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
DFS_ACTION_NAME,
ThreadPool.Names.SAME,
false,
true,
AdmissionControlActionType.SEARCH,
ShardSearchRequest::new,
(request, channel, task) -> searchService.executeDfsPhase(
request,
Expand All @@ -556,6 +560,9 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
QUERY_ACTION_NAME,
ThreadPool.Names.SAME,
false,
true,
AdmissionControlActionType.SEARCH,
ShardSearchRequest::new,
(request, channel, task) -> {
searchService.executeQueryPhase(
Expand All @@ -575,6 +582,9 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
QUERY_ID_ACTION_NAME,
ThreadPool.Names.SAME,
false,
true,
AdmissionControlActionType.SEARCH,
QuerySearchRequest::new,
(request, channel, task) -> {
searchService.executeQueryPhase(
Expand Down Expand Up @@ -633,6 +643,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
ThreadPool.Names.SAME,
true,
true,
AdmissionControlActionType.SEARCH,
ShardFetchSearchRequest::new,
(request, channel, task) -> {
searchService.executeFetchPhase(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.action.bulk.BulkAction;
import org.opensearch.action.bulk.TransportShardBulkAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.ChannelActionListener;
Expand Down Expand Up @@ -82,6 +84,7 @@
import org.opensearch.indices.IndexClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
Expand Down Expand Up @@ -219,14 +222,26 @@ protected TransportReplicationAction(

transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);

transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
if(transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)){
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
AdmissionControlActionType.INDEXING,
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
} else {
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
}

// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.http.HttpServerTransport;
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.RawTaskStatus;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
Expand Down Expand Up @@ -300,6 +301,20 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
return actualHandler;
}

@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
String action,
String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler,
AdmissionControlActionType transportActionType
) {
for (TransportInterceptor interceptor : this.transportInterceptors) {
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, transportActionType);
}
return actualHandler;
}

@Override
public AsyncSender interceptSender(AsyncSender sender) {
for (TransportInterceptor interceptor : this.transportInterceptors) {
Expand Down
Loading

0 comments on commit aab2010

Please sign in to comment.