From 8c5e6ede466f11d4047233f007e62d148f6eefd7 Mon Sep 17 00:00:00 2001 From: Ajay Kumar Movva Date: Mon, 8 Jan 2024 15:28:25 +0530 Subject: [PATCH] Thread Context POC changes Signed-off-by: Ajay Kumar Movva --- .../coordination/FollowersChecker.java | 6 +- .../cluster/coordination/LeaderChecker.java | 6 +- .../common/util/concurrent/ThreadContext.java | 3 +- .../ThreadContextStatePropagator.java | 5 +- .../main/java/org/opensearch/node/Node.java | 11 +- .../node/NodeResourceUsageStats.java | 13 ++- .../node/ResourceUsageCollectorService.java | 7 ++ .../AdmissionControlInterceptSender.java | 95 ++++++++++++++++ .../AdmissionControlTransportInterceptor.java | 31 +++++- .../tasks/ResourceUsageStatsTCPropagator.java | 40 +++++++ .../ResourceUsageStatsReference.java | 31 ++++++ .../transport/TransportService.java | 102 ++++++++++++++++-- .../java/org/opensearch/node/MockNode.java | 6 +- 13 files changed, 320 insertions(+), 36 deletions(-) create mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlInterceptSender.java create mode 100644 server/src/main/java/org/opensearch/tasks/ResourceUsageStatsTCPropagator.java create mode 100644 server/src/main/java/org/opensearch/transport/ResourceUsageStatsReference.java diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index 70bb0515bb022..e934f721cf939 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -89,7 +89,7 @@ public class FollowersChecker { // the time between checks sent to each node public static final Setting FOLLOWER_CHECK_INTERVAL_SETTING = Setting.timeSetting( "cluster.fault_detection.follower_check.interval", - TimeValue.timeValueMillis(1000), + TimeValue.timeValueMillis(100000), TimeValue.timeValueMillis(100), Setting.Property.NodeScope ); @@ -97,9 +97,9 @@ public class FollowersChecker { // the timeout for each check sent to each node public static final Setting FOLLOWER_CHECK_TIMEOUT_SETTING = Setting.timeSetting( "cluster.fault_detection.follower_check.timeout", - TimeValue.timeValueMillis(10000), + TimeValue.timeValueMillis(100000), TimeValue.timeValueMillis(1), - TimeValue.timeValueMillis(60000), + TimeValue.timeValueMillis(6000000), Setting.Property.NodeScope, Setting.Property.Dynamic ); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java index 8d4373b865f62..5f864785ad847 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java @@ -88,7 +88,7 @@ public class LeaderChecker { // the time between checks sent to the leader public static final Setting LEADER_CHECK_INTERVAL_SETTING = Setting.timeSetting( "cluster.fault_detection.leader_check.interval", - TimeValue.timeValueMillis(1000), + TimeValue.timeValueMillis(100000), TimeValue.timeValueMillis(100), Setting.Property.NodeScope ); @@ -96,9 +96,9 @@ public class LeaderChecker { // the timeout for each check sent to the leader public static final Setting LEADER_CHECK_TIMEOUT_SETTING = Setting.timeSetting( "cluster.fault_detection.leader_check.timeout", - TimeValue.timeValueMillis(10000), + TimeValue.timeValueMillis(100000), TimeValue.timeValueMillis(1), - TimeValue.timeValueMillis(60000), + TimeValue.timeValueMillis(600000), Setting.Property.NodeScope, Setting.Property.Dynamic ); diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java index 6580b0e0085ef..eb670e262d8f4 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java @@ -45,6 +45,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.http.HttpTransportSettings; +import org.opensearch.tasks.ResourceUsageStatsTCPropagator; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskThreadContextStatePropagator; @@ -128,7 +129,7 @@ public ThreadContext(Settings settings) { this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT); this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings); this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes(); - this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator())); + this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator(), new ResourceUsageStatsTCPropagator())); } public void registerThreadContextStatePropagator(final ThreadContextStatePropagator propagator) { diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java index e8c12ae13d5eb..55de321a993a6 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java @@ -10,6 +10,7 @@ import org.opensearch.common.annotation.PublicApi; +import java.util.HashMap; import java.util.Map; /** @@ -26,7 +27,7 @@ public interface ThreadContextStatePropagator { * @param source current context transient headers * @return the list of transient headers that needs to be propagated from current context to new thread context */ - @Deprecated(since = "2.12.0", forRemoval = true) +// @Deprecated(since = "2.12.0", forRemoval = true) Map transients(Map source); /** @@ -46,7 +47,7 @@ default Map transients(Map source, boolean isSys * @param source current context headers * @return the list of request headers that needs to be propagated from current context to request */ - @Deprecated(since = "2.12.0", forRemoval = true) +// @Deprecated(since = "2.12.0", forRemoval = true) Map headers(Map source); /** diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 547f610f4a752..90608627e2dac 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -932,7 +932,8 @@ protected Node( ); AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor( - admissionControlService + admissionControlService, + threadPool ); List transportInterceptors = List.of(admissionControlTransportInterceptor); @@ -981,7 +982,8 @@ protected Node( localNodeFactory, settingsModule.getClusterSettings(), taskHeaders, - tracer + tracer, + resourceUsageCollectorService ); TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings()); transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer); @@ -1344,9 +1346,10 @@ protected TransportService newTransportService( Function localNodeFactory, ClusterSettings clusterSettings, Set taskHeaders, - Tracer tracer + Tracer tracer, + ResourceUsageCollectorService resourceUsageCollectorService ) { - return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer); + return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer, resourceUsageCollectorService); } protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) { diff --git a/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java b/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java index 6ef66d4ac1914..701de60e860b5 100644 --- a/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java +++ b/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java @@ -49,13 +49,12 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - StringBuilder sb = new StringBuilder("NodeResourceUsageStats["); - sb.append(nodeId).append("]("); - sb.append("Timestamp: ").append(timestamp); - sb.append(", CPU utilization percent: ").append(String.format(Locale.ROOT, "%.1f", cpuUtilizationPercent)); - sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", memoryUtilizationPercent)); - sb.append(")"); - return sb.toString(); + return nodeId + ":" + + timestamp + + "," + + memoryUtilizationPercent + + "," + + cpuUtilizationPercent; } NodeResourceUsageStats(NodeResourceUsageStats nodeResourceUsageStats) { diff --git a/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java b/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java index f1c763e09f147..17c94c20f8f6d 100644 --- a/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java +++ b/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java @@ -113,6 +113,13 @@ public Optional getNodeStatistics(final String nodeId) { .map(resourceUsageStats -> new NodeResourceUsageStats(resourceUsageStats)); } + public Optional getLocalNodeStatistics() { + if(clusterService.state() != null) { + return Optional.ofNullable(nodeIdToResourceUsageStats.get(clusterService.state().nodes().getLocalNodeId())) + .map(resourceUsageStats -> new NodeResourceUsageStats(resourceUsageStats)); + } + return Optional.empty(); + } /** * Returns collected resource usage statistics of all nodes */ diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlInterceptSender.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlInterceptSender.java new file mode 100644 index 0000000000000..1f34aa91c3857 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlInterceptSender.java @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.*; + +import java.io.IOException; + +public class AdmissionControlInterceptSender { + + ThreadPool threadPool; + public AdmissionControlInterceptSender(ThreadPool threadPool) { + this.threadPool = threadPool; + } + private static final Logger logger = LogManager.getLogger(AdmissionControlInterceptSender.class); + public void sendRequestDecorate( + TransportInterceptor.AsyncSender sender, + Transport.Connection connection, + String action, + TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler handler + ) { + try (ThreadContext.StoredContext stashedContext = this.getThreadContext().stashContext()) { + if(isActionIndexingOrSearch(action)){ + logger.info("AdmissionControlInterceptSender is Triggered Action: {}", action); + } + RestoringTransportResponseHandler restoringTransportResponseHandler = new RestoringTransportResponseHandler(handler, stashedContext, action); + sender.sendRequest(connection, action, request, options, restoringTransportResponseHandler); + } + } + + private boolean isActionIndexingOrSearch(String action) { + return action.startsWith("indices:data/read/search") || action.startsWith("indices:data/write/bulk"); + } + + private ThreadContext getThreadContext() { + return threadPool.getThreadContext(); + } + + private static class RestoringTransportResponseHandler implements TransportResponseHandler { + + private final ThreadContext.StoredContext contextToRestore; + private final TransportResponseHandler innerHandler; + + private final String action; + + private RestoringTransportResponseHandler(TransportResponseHandler innerHandler, ThreadContext.StoredContext contextToRestore, String action) { + this.contextToRestore = contextToRestore; + this.innerHandler = innerHandler; + this.action = action; + } + + @Override + public T read(StreamInput in) throws IOException { + return innerHandler.read(in); + } + + @Override + public void handleResponse(T response) { + if (this.isActionIndexingOrSearch(this.action)){ + logger.info("Handle Response Triggered in: RestoringTransportResponseHandler"); + } + contextToRestore.restore(); + innerHandler.handleResponse(response); + } + + @Override + public void handleException(TransportException e) { + contextToRestore.restore(); + innerHandler.handleException(e); + } + + @Override + public String executor() { + return innerHandler.executor(); + } + + private boolean isActionIndexingOrSearch(String action) { + return action.startsWith("indices:data/read/search") || action.startsWith("indices:data/write/bulk"); + } + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java index ae1520bca769d..b04c636c678f8 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java @@ -8,11 +8,13 @@ package org.opensearch.ratelimitting.admissioncontrol.transport; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.transport.TransportResponse; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; -import org.opensearch.transport.TransportInterceptor; -import org.opensearch.transport.TransportRequest; -import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.*; /** * This class allows throttling by intercepting requests on both the sender and the receiver side. @@ -20,9 +22,13 @@ public class AdmissionControlTransportInterceptor implements TransportInterceptor { AdmissionControlService admissionControlService; + AdmissionControlInterceptSender admissionControlInterceptSender; - public AdmissionControlTransportInterceptor(AdmissionControlService admissionControlService) { + private static final Logger logger = LogManager.getLogger(AdmissionControlTransportInterceptor.class); + + public AdmissionControlTransportInterceptor(AdmissionControlService admissionControlService, ThreadPool threadPool) { this.admissionControlService = admissionControlService; + admissionControlInterceptSender = new AdmissionControlInterceptSender(threadPool); } /** @@ -45,4 +51,21 @@ public TransportRequestHandler interceptHandler( admissionControlActionType ); } + + @Override + public AsyncSender interceptSender(AsyncSender sender) { + logger.info("AdmissionControl Intercept Sender Initialised"); + return new AsyncSender() { + @Override + public void sendRequest( + Transport.Connection connection, + String action, + TransportRequest request, + TransportRequestOptions options, + TransportResponseHandler handler + ) { + admissionControlInterceptSender.sendRequestDecorate(sender, connection, action, request, options, handler); + } + }; + } } diff --git a/server/src/main/java/org/opensearch/tasks/ResourceUsageStatsTCPropagator.java b/server/src/main/java/org/opensearch/tasks/ResourceUsageStatsTCPropagator.java new file mode 100644 index 0000000000000..240da4b8486cc --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ResourceUsageStatsTCPropagator.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import java.util.HashMap; +import java.util.Map; +import org.opensearch.common.util.concurrent.ThreadContextStatePropagator; + +public class ResourceUsageStatsTCPropagator implements ThreadContextStatePropagator { + public static final String NODE_RESOURCE_STATS = "PERF_STATS"; + @Override + public Map transients(Map source) { + final Map transients = new HashMap<>(); + for(Map.Entry entry : source.entrySet()) { + if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) { + // key starts with prefix + transients.put(entry.getKey(), entry.getValue()); + } + } + return transients; + } + + @Override + public Map headers(Map source) { + final Map headers = new HashMap<>(); + for(Map.Entry entry : source.entrySet()) { + if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) { + // key starts with prefix + headers.put(entry.getKey(), entry.getValue().toString()); + } + } + return headers; + } +} diff --git a/server/src/main/java/org/opensearch/transport/ResourceUsageStatsReference.java b/server/src/main/java/org/opensearch/transport/ResourceUsageStatsReference.java new file mode 100644 index 0000000000000..5db9b5053c7b7 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/ResourceUsageStatsReference.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +public class ResourceUsageStatsReference { + private String resourceUsageStats; + + public ResourceUsageStatsReference(String stats) { + this.resourceUsageStats = stats; + } + + public String getResourceUsageStats() { + return resourceUsageStats; + } + + public void setResourceUsageStats(String stats) { + this.resourceUsageStats = new String(stats); + } + + @Override + public String toString() { + return this.resourceUsageStats; + } + +} diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index d50266d8c9e4a..29f85e72888aa 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -64,6 +64,7 @@ import org.opensearch.core.service.ReportingService; import org.opensearch.core.transport.TransportResponse; import org.opensearch.node.NodeClosedException; +import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; @@ -78,14 +79,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.UnknownHostException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -115,6 +109,7 @@ public class TransportService extends AbstractLifecycleComponent protected final ThreadPool threadPool; protected final ClusterName clusterName; protected final TaskManager taskManager; + private Set admissionControlTransportActions = new HashSet<>(); private final TransportInterceptor.AsyncSender asyncSender; private final Function localNodeFactory; private final boolean remoteClusterClient; @@ -143,6 +138,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { private final RemoteClusterService remoteClusterService; private final Tracer tracer; + private final ResourceUsageCollectorService resourceUsageCollectorService; /** if set will call requests sent to this id to shortcut and executed locally */ volatile DiscoveryNode localNode = null; @@ -189,6 +185,31 @@ public static void ensureClassloaded() {} * @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings * * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}. */ + public TransportService( + Settings settings, + Transport transport, + ThreadPool threadPool, + TransportInterceptor transportInterceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings, + Set taskHeaders, + Tracer tracer, + ResourceUsageCollectorService resourceUsageCollectorService + ) { + this( + settings, + transport, + threadPool, + transportInterceptor, + localNodeFactory, + clusterSettings, + taskHeaders, + new ClusterConnectionManager(settings, transport), + tracer, + resourceUsageCollectorService + ); + } + public TransportService( Settings settings, Transport transport, @@ -208,7 +229,8 @@ public TransportService( clusterSettings, taskHeaders, new ClusterConnectionManager(settings, transport), - tracer + tracer, + null ); } @@ -221,7 +243,8 @@ public TransportService( @Nullable ClusterSettings clusterSettings, Set taskHeaders, ConnectionManager connectionManager, - Tracer tracer + Tracer tracer, + ResourceUsageCollectorService resourceUsageCollectorService ) { this.transport = transport; transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings)); @@ -237,6 +260,7 @@ public TransportService( this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); this.tracer = tracer; + this.resourceUsageCollectorService = resourceUsageCollectorService; remoteClusterService = new RemoteClusterService(settings, this); responseHandlers = transport.getResponseHandlers(); if (clusterSettings != null) { @@ -1238,6 +1262,15 @@ public void onRequestReceived(long requestId, String action) { if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] received request", requestId, action); } + try { + if (this.isActionIndexingOrSearch(action)) { + logger.info("[{}][{}] received request", requestId, action); + } + } catch (Exception e) { + logger.debug(e.getMessage()); + } +// fetchResourceUsageStatsFromThreadContext(action); + addResourceUsageStatsToThreadContext(action); messageListener.onRequestReceived(requestId, action); } @@ -1253,6 +1286,13 @@ public void onRequestSent( if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout()); } + try { + if (this.isActionIndexingOrSearch(action)) { + logger.info("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout()); + } + } catch (Exception e) { + logger.debug(e.getMessage()); + } messageListener.onRequestSent(node, requestId, action, request, options); } @@ -1263,6 +1303,14 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder) } else if (tracerLog.isTraceEnabled() && shouldTraceAction(holder.action())) { tracerLog.trace("[{}][{}] received response from [{}]", requestId, holder.action(), holder.connection().getNode()); } + try { + if (this.isActionIndexingOrSearch(holder.action())) { + logger.info("[{}][{}] received response from [{}]", requestId, holder.action(), holder.connection().getNode()); + fetchResourceUsageStatsFromThreadContext(holder.action()); + } + } catch (Exception e) { + logger.debug(e.getMessage()); + } messageListener.onResponseReceived(requestId, holder); } @@ -1272,6 +1320,13 @@ public void onResponseSent(long requestId, String action, TransportResponse resp if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] sent response", requestId, action); } + try { + if (this.isActionIndexingOrSearch(action)) { + logger.info("[{}][{}] sent response", requestId, action); + } + } catch (Exception e) { + logger.debug(e.getMessage()); + } messageListener.onResponseSent(requestId, action, response); } @@ -1692,12 +1747,14 @@ private void sendRequestAsync( delegate = new TransportResponseHandler() { @Override public void handleResponse(T response) { +// addResourceUsageStatsToThreadContext(action); unregisterChildNode.close(); handler.handleResponse(response); } @Override public void handleException(TransportException exp) { +// addResourceUsageStatsToThreadContext(action); unregisterChildNode.close(); handler.handleException(exp); } @@ -1732,4 +1789,29 @@ public String toString() { handler.handleException(te); } } + + private void addResourceUsageStatsToThreadContext(String action) { + if(this.isActionIndexingOrSearch(action)) { + logger.info("Adding Stats to Thread Context: {}", action); + ThreadContext threadContext = threadPool.getThreadContext(); + String key = "PERF_STATS_" + this.localNode.getId(); + if(resourceUsageCollectorService.getLocalNodeStatistics().isPresent()){ + threadContext.putTransient(key, resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); + } + } + } + + private void fetchResourceUsageStatsFromThreadContext(String action) { + if(this.isActionIndexingOrSearch(action)) { + logger.info("Fetching Stats From Context: {}" , action); + ThreadContext threadContext = threadPool.getThreadContext(); + threadContext.getHeaders().forEach((key, value) -> { + logger.info("Header: {} Value: {}", key, value); + }); + } + } + + private boolean isActionIndexingOrSearch(String action) { + return action.startsWith("indices:data/read/search") || action.startsWith("indices:data/write/bulk"); + } } diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index e6c7e21d5b3ea..a567f7a2e8639 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -201,7 +201,8 @@ protected TransportService newTransportService( Function localNodeFactory, ClusterSettings clusterSettings, Set taskHeaders, - Tracer tracer + Tracer tracer, + ResourceUsageCollectorService resourceUsageCollectorService ) { // we use the MockTransportService.TestPlugin class as a marker to create a network // module with this MockNetworkService. NetworkService is such an integral part of the systme @@ -216,7 +217,8 @@ protected TransportService newTransportService( localNodeFactory, clusterSettings, taskHeaders, - tracer + tracer, + resourceUsageCollectorService ); } else { return new MockTransportService(