Skip to content

Commit

Permalink
Thread Context POC changes
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <[email protected]>
  • Loading branch information
Ajay Kumar Movva committed Feb 9, 2024
1 parent 30c4210 commit 8c5e6ed
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,17 @@ public class FollowersChecker {
// the time between checks sent to each node
public static final Setting<TimeValue> FOLLOWER_CHECK_INTERVAL_SETTING = Setting.timeSetting(
"cluster.fault_detection.follower_check.interval",
TimeValue.timeValueMillis(1000),
TimeValue.timeValueMillis(100000),
TimeValue.timeValueMillis(100),
Setting.Property.NodeScope
);

// the timeout for each check sent to each node
public static final Setting<TimeValue> FOLLOWER_CHECK_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.fault_detection.follower_check.timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(100000),
TimeValue.timeValueMillis(1),
TimeValue.timeValueMillis(60000),
TimeValue.timeValueMillis(6000000),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,17 @@ public class LeaderChecker {
// the time between checks sent to the leader
public static final Setting<TimeValue> LEADER_CHECK_INTERVAL_SETTING = Setting.timeSetting(
"cluster.fault_detection.leader_check.interval",
TimeValue.timeValueMillis(1000),
TimeValue.timeValueMillis(100000),
TimeValue.timeValueMillis(100),
Setting.Property.NodeScope
);

// the timeout for each check sent to the leader
public static final Setting<TimeValue> LEADER_CHECK_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.fault_detection.leader_check.timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(100000),
TimeValue.timeValueMillis(1),
TimeValue.timeValueMillis(60000),
TimeValue.timeValueMillis(600000),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.tasks.ResourceUsageStatsTCPropagator;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskThreadContextStatePropagator;

Expand Down Expand Up @@ -128,7 +129,7 @@ public ThreadContext(Settings settings) {
this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT);
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator()));
this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator(), new ResourceUsageStatsTCPropagator()));
}

public void registerThreadContextStatePropagator(final ThreadContextStatePropagator propagator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.common.annotation.PublicApi;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -26,7 +27,7 @@ public interface ThreadContextStatePropagator {
* @param source current context transient headers
* @return the list of transient headers that needs to be propagated from current context to new thread context
*/
@Deprecated(since = "2.12.0", forRemoval = true)
// @Deprecated(since = "2.12.0", forRemoval = true)
Map<String, Object> transients(Map<String, Object> source);

/**
Expand All @@ -46,7 +47,7 @@ default Map<String, Object> transients(Map<String, Object> source, boolean isSys
* @param source current context headers
* @return the list of request headers that needs to be propagated from current context to request
*/
@Deprecated(since = "2.12.0", forRemoval = true)
// @Deprecated(since = "2.12.0", forRemoval = true)
Map<String, String> headers(Map<String, Object> source);

/**
Expand Down
11 changes: 7 additions & 4 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,8 @@ protected Node(
);

AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor(
admissionControlService
admissionControlService,
threadPool
);

List<TransportInterceptor> transportInterceptors = List.of(admissionControlTransportInterceptor);
Expand Down Expand Up @@ -981,7 +982,8 @@ protected Node(
localNodeFactory,
settingsModule.getClusterSettings(),
taskHeaders,
tracer
tracer,
resourceUsageCollectorService
);
TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings());
transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer);
Expand Down Expand Up @@ -1344,9 +1346,10 @@ protected TransportService newTransportService(
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
Set<String> taskHeaders,
Tracer tracer
Tracer tracer,
ResourceUsageCollectorService resourceUsageCollectorService
) {
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer);
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer, resourceUsageCollectorService);
}

protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
StringBuilder sb = new StringBuilder("NodeResourceUsageStats[");
sb.append(nodeId).append("](");
sb.append("Timestamp: ").append(timestamp);
sb.append(", CPU utilization percent: ").append(String.format(Locale.ROOT, "%.1f", cpuUtilizationPercent));
sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", memoryUtilizationPercent));
sb.append(")");
return sb.toString();
return nodeId + ":" +
timestamp +
"," +
memoryUtilizationPercent +
"," +
cpuUtilizationPercent;
}

NodeResourceUsageStats(NodeResourceUsageStats nodeResourceUsageStats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ public Optional<NodeResourceUsageStats> getNodeStatistics(final String nodeId) {
.map(resourceUsageStats -> new NodeResourceUsageStats(resourceUsageStats));
}

public Optional<NodeResourceUsageStats> getLocalNodeStatistics() {
if(clusterService.state() != null) {
return Optional.ofNullable(nodeIdToResourceUsageStats.get(clusterService.state().nodes().getLocalNodeId()))
.map(resourceUsageStats -> new NodeResourceUsageStats(resourceUsageStats));
}
return Optional.empty();
}
/**
* Returns collected resource usage statistics of all nodes
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.*;

import java.io.IOException;

public class AdmissionControlInterceptSender {

ThreadPool threadPool;
public AdmissionControlInterceptSender(ThreadPool threadPool) {
this.threadPool = threadPool;
}
private static final Logger logger = LogManager.getLogger(AdmissionControlInterceptSender.class);
public <T extends TransportResponse> void sendRequestDecorate(
TransportInterceptor.AsyncSender sender,
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
try (ThreadContext.StoredContext stashedContext = this.getThreadContext().stashContext()) {
if(isActionIndexingOrSearch(action)){
logger.info("AdmissionControlInterceptSender is Triggered Action: {}", action);
}
RestoringTransportResponseHandler restoringTransportResponseHandler = new RestoringTransportResponseHandler(handler, stashedContext, action);
sender.sendRequest(connection, action, request, options, restoringTransportResponseHandler);
}
}

private boolean isActionIndexingOrSearch(String action) {
return action.startsWith("indices:data/read/search") || action.startsWith("indices:data/write/bulk");
}

private ThreadContext getThreadContext() {
return threadPool.getThreadContext();
}

private static class RestoringTransportResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {

private final ThreadContext.StoredContext contextToRestore;
private final TransportResponseHandler<T> innerHandler;

private final String action;

private RestoringTransportResponseHandler(TransportResponseHandler<T> innerHandler, ThreadContext.StoredContext contextToRestore, String action) {
this.contextToRestore = contextToRestore;
this.innerHandler = innerHandler;
this.action = action;
}

@Override
public T read(StreamInput in) throws IOException {
return innerHandler.read(in);
}

@Override
public void handleResponse(T response) {
if (this.isActionIndexingOrSearch(this.action)){
logger.info("Handle Response Triggered in: RestoringTransportResponseHandler");
}
contextToRestore.restore();
innerHandler.handleResponse(response);
}

@Override
public void handleException(TransportException e) {
contextToRestore.restore();
innerHandler.handleException(e);
}

@Override
public String executor() {
return innerHandler.executor();
}

private boolean isActionIndexingOrSearch(String action) {
return action.startsWith("indices:data/read/search") || action.startsWith("indices:data/write/bulk");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,27 @@

package org.opensearch.ratelimitting.admissioncontrol.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.*;

/**
* This class allows throttling by intercepting requests on both the sender and the receiver side.
*/
public class AdmissionControlTransportInterceptor implements TransportInterceptor {

AdmissionControlService admissionControlService;
AdmissionControlInterceptSender admissionControlInterceptSender;

public AdmissionControlTransportInterceptor(AdmissionControlService admissionControlService) {
private static final Logger logger = LogManager.getLogger(AdmissionControlTransportInterceptor.class);

public AdmissionControlTransportInterceptor(AdmissionControlService admissionControlService, ThreadPool threadPool) {
this.admissionControlService = admissionControlService;
admissionControlInterceptSender = new AdmissionControlInterceptSender(threadPool);
}

/**
Expand All @@ -45,4 +51,21 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
admissionControlActionType
);
}

@Override
public AsyncSender interceptSender(AsyncSender sender) {
logger.info("AdmissionControl Intercept Sender Initialised");
return new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
admissionControlInterceptSender.sendRequestDecorate(sender, connection, action, request, options, handler);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import java.util.HashMap;
import java.util.Map;
import org.opensearch.common.util.concurrent.ThreadContextStatePropagator;

public class ResourceUsageStatsTCPropagator implements ThreadContextStatePropagator {
public static final String NODE_RESOURCE_STATS = "PERF_STATS";
@Override
public Map<String, Object> transients(Map<String, Object> source) {
final Map<String, Object> transients = new HashMap<>();
for(Map.Entry<String, Object> entry : source.entrySet()) {
if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) {
// key starts with prefix
transients.put(entry.getKey(), entry.getValue());
}
}
return transients;
}

@Override
public Map<String, String> headers(Map<String, Object> source) {
final Map<String, String> headers = new HashMap<>();
for(Map.Entry<String, Object> entry : source.entrySet()) {
if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) {
// key starts with prefix
headers.put(entry.getKey(), entry.getValue().toString());
}
}
return headers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

public class ResourceUsageStatsReference {
private String resourceUsageStats;

public ResourceUsageStatsReference(String stats) {
this.resourceUsageStats = stats;
}

public String getResourceUsageStats() {
return resourceUsageStats;
}

public void setResourceUsageStats(String stats) {
this.resourceUsageStats = new String(stats);
}

@Override
public String toString() {
return this.resourceUsageStats;
}

}
Loading

0 comments on commit 8c5e6ed

Please sign in to comment.