From d7d566054b887c89707975725daa14da8e94ba2f Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Sat, 20 Jul 2024 17:35:19 +0530 Subject: [PATCH] Adds the listener for resource utilization metrics Signed-off-by: Gagan Juneja --- .../PerformanceAnalyzerPlugin.java | 10 +- .../config/PerformanceAnalyzerController.java | 9 + .../PerformanceAnalyzerSearchListener.java | 14 +- .../RTFPerformanceAnalyzerSearchListener.java | 255 ++++++++++++++++++ ...rmanceAnalyzerTransportRequestHandler.java | 10 +- ...TFPerformanceAnalyzerTransportChannel.java | 129 +++++++++ ...rformanceAnalyzerTransportInterceptor.java | 34 +++ ...rmanceAnalyzerTransportRequestHandler.java | 124 +++++++++ .../performanceanalyzer/util/Utils.java | 15 ++ .../PerformanceAnalyzerPluginTests.java | 10 +- ...erformanceAnalyzerSearchListenerTests.java | 16 ++ ...erformanceAnalyzerSearchListenerTests.java | 163 +++++++++++ ...eAnalyzerTransportRequestHandlerTests.java | 28 ++ ...formanceAnalyzerTransportChannelTests.java | 82 ++++++ ...eAnalyzerTransportRequestHandlerTests.java | 120 +++++++++ .../performanceanalyzer/util/UtilsTests.java | 27 ++ 16 files changed, 1033 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index 143d1f1a..06f19fa2 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -86,7 +86,9 @@ import org.opensearch.performanceanalyzer.http_action.whoami.TransportWhoAmIAction; import org.opensearch.performanceanalyzer.http_action.whoami.WhoAmIAction; import org.opensearch.performanceanalyzer.listener.PerformanceAnalyzerSearchListener; +import org.opensearch.performanceanalyzer.listener.RTFPerformanceAnalyzerSearchListener; import org.opensearch.performanceanalyzer.transport.PerformanceAnalyzerTransportInterceptor; +import org.opensearch.performanceanalyzer.transport.RTFPerformanceAnalyzerTransportInterceptor; import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.performanceanalyzer.writer.EventLogQueueProcessor; import org.opensearch.plugins.ActionPlugin; @@ -302,7 +304,10 @@ public List getActionFilters() { public void onIndexModule(IndexModule indexModule) { PerformanceAnalyzerSearchListener performanceanalyzerSearchListener = new PerformanceAnalyzerSearchListener(performanceAnalyzerController); + RTFPerformanceAnalyzerSearchListener rtfPerformanceAnalyzerSearchListener = + new RTFPerformanceAnalyzerSearchListener(performanceAnalyzerController); indexModule.addSearchOperationListener(performanceanalyzerSearchListener); + indexModule.addSearchOperationListener(rtfPerformanceAnalyzerSearchListener); } // follower check, leader check @@ -330,8 +335,9 @@ public void onDiscovery(Discovery discovery) { @Override public List getTransportInterceptors( NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { - return singletonList( - new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController)); + return Arrays.asList( + new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController), + new RTFPerformanceAnalyzerTransportInterceptor(performanceAnalyzerController)); } @Override diff --git a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java index faaebc7c..74cc3e26 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java +++ b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java @@ -383,4 +383,13 @@ public boolean isCollectorDisabled( return disabledCollectorsList.contains(collectorName); } + + /** + * Collectors Setting value. + * + * @return collectorsSettingValue + */ + public int getCollectorsSettingValue() { + return collectorsSettingValue; + } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java index 7719cdd6..f07415f8 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java @@ -7,6 +7,7 @@ import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; +import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.index.shard.SearchOperationListener; @@ -16,6 +17,7 @@ import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor; import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; import org.opensearch.performanceanalyzer.commons.util.ThreadIDUtil; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.search.internal.SearchContext; @@ -36,8 +38,16 @@ public String toString() { return PerformanceAnalyzerSearchListener.class.getSimpleName(); } - private SearchListener getSearchListener() { - return controller.isPerformanceAnalyzerEnabled() ? this : NO_OP_SEARCH_LISTENER; + @VisibleForTesting + SearchListener getSearchListener() { + return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER; + } + + private boolean isSearchListenerEnabled() { + return controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsSettingValue() + == Util.CollectorMode.RCA.getValue()); } @Override diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java new file mode 100644 index 00000000..059b8750 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -0,0 +1,255 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.listener; + +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.action.NotifyOnceListener; +import org.opensearch.index.shard.SearchOperationListener; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * {@link SearchOperationListener} to capture the resource utilization of a shard search operation. + * This will be getting the resource tracking information from the {@link + * org.opensearch.tasks.TaskResourceTrackingService}. + */ +public class RTFPerformanceAnalyzerSearchListener + implements SearchOperationListener, SearchListener { + + private static final Logger LOG = + LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class); + private static final String OPERATION_SHARD_FETCH = "shard_fetch"; + private static final String OPERATION_SHARD_QUERY = "shard_query"; + public static final String QUERY_START_TIME = "query_start_time"; + public static final String FETCH_START_TIME = "fetch_start_time"; + private final ThreadLocal> threadLocal; + private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener(); + + private final PerformanceAnalyzerController controller; + private final Histogram cpuUtilizationHistogram; + private final Histogram heapUsedHistogram; + private final int numProcessors; + + public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) { + this.controller = controller; + this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); + heapUsedHistogram = createHeapUsedHistogram(); + this.threadLocal = ThreadLocal.withInitial(() -> new HashMap()); + this.numProcessors = Runtime.getRuntime().availableProcessors(); + } + + private Histogram createCPUUtilizationHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), + "CPU Utilization per shard for an operation", + RTFMetrics.MetricUnits.RATE.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + private Histogram createHeapUsedHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.HeapValue.HEAP_USED.toString(), + "Heap used per shard for an operation", + RTFMetrics.MetricUnits.BYTE.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + + @Override + public String toString() { + return RTFPerformanceAnalyzerSearchListener.class.getSimpleName(); + } + + @VisibleForTesting + SearchListener getSearchListener() { + return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER; + } + + private boolean isSearchListenerEnabled() { + LOG.debug( + "Controller enable status {}, CollectorMode value {}", + controller.isPerformanceAnalyzerEnabled(), + controller.getCollectorsSettingValue()); + return OpenSearchResources.INSTANCE.getMetricsRegistry() != null + && controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsSettingValue() + == Util.CollectorMode.TELEMETRY.getValue()); + } + + @Override + public void onPreQueryPhase(SearchContext searchContext) { + try { + getSearchListener().preQueryPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onQueryPhase(SearchContext searchContext, long tookInNanos) { + try { + getSearchListener().queryPhase(searchContext, tookInNanos); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onFailedQueryPhase(SearchContext searchContext) { + try { + getSearchListener().failedQueryPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onPreFetchPhase(SearchContext searchContext) { + try { + getSearchListener().preFetchPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onFetchPhase(SearchContext searchContext, long tookInNanos) { + try { + getSearchListener().fetchPhase(searchContext, tookInNanos); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void onFailedFetchPhase(SearchContext searchContext) { + try { + getSearchListener().failedFetchPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + } + + @Override + public void preQueryPhase(SearchContext searchContext) { + threadLocal.get().put(QUERY_START_TIME, System.nanoTime()); + } + + @Override + public void queryPhase(SearchContext searchContext, long tookInNanos) { + long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + addResourceTrackingCompletionListener( + searchContext, queryStartTime, OPERATION_SHARD_QUERY, false); + } + + @Override + public void failedQueryPhase(SearchContext searchContext) { + long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + addResourceTrackingCompletionListener( + searchContext, queryStartTime, OPERATION_SHARD_QUERY, true); + } + + @Override + public void preFetchPhase(SearchContext searchContext) { + threadLocal.get().put(FETCH_START_TIME, System.nanoTime()); + } + + @Override + public void fetchPhase(SearchContext searchContext, long tookInNanos) { + long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); + addResourceTrackingCompletionListener( + searchContext, fetchStartTime, OPERATION_SHARD_FETCH, false); + } + + @Override + public void failedFetchPhase(SearchContext searchContext) { + long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); + addResourceTrackingCompletionListener( + searchContext, fetchStartTime, OPERATION_SHARD_FETCH, true); + } + + private void addResourceTrackingCompletionListener( + SearchContext searchContext, long startTime, String operation, boolean isFailed) { + searchContext + .getTask() + .addResourceTrackingCompletionListener( + createListener( + searchContext, + (System.nanoTime() - startTime), + operation, + isFailed)); + } + + @VisibleForTesting + NotifyOnceListener createListener( + SearchContext searchContext, long totalTime, String operation, boolean isFailed) { + return new NotifyOnceListener() { + @Override + protected void innerOnResponse(Task task) { + LOG.debug("Updating the counter for task {}", task.getId()); + cpuUtilizationHistogram.record( + Utils.calculateCPUUtilization( + numProcessors, + totalTime, + task.getTotalResourceStats().getCpuTimeInNanos()), + createTags()); + heapUsedHistogram.record( + Math.max(0, task.getTotalResourceStats().getMemoryInBytes()), createTags()); + } + + private Tags createTags() { + return Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + searchContext.request().shardId().getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + searchContext.request().shardId().getIndex().getUUID()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + searchContext.request().shardId().getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); + } + + @Override + protected void innerOnFailure(Exception e) { + LOG.error("Error is executing the the listener", e); + } + }; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java index a4cd946e..ad517c74 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java @@ -13,6 +13,7 @@ import org.opensearch.action.bulk.BulkShardRequest; import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportChannel; @@ -45,7 +46,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro @VisibleForTesting TransportChannel getChannel(T request, TransportChannel channel, Task task) { - if (!controller.isPerformanceAnalyzerEnabled()) { + if (!isCollectorEnabled()) { return channel; } @@ -56,6 +57,13 @@ TransportChannel getChannel(T request, TransportChannel channel, Task task) { } } + private boolean isCollectorEnabled() { + return controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsSettingValue() + == Util.CollectorMode.RCA.getValue()); + } + private TransportChannel getShardBulkChannel(T request, TransportChannel channel, Task task) { String className = request.getClass().getName(); boolean bPrimary = false; diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java new file mode 100644 index 00000000..40465497 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -0,0 +1,129 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import com.google.common.annotations.VisibleForTesting; +import com.sun.management.ThreadMXBean; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.tags.Tags; +import org.opensearch.transport.TransportChannel; + +/** + * {@link TransportChannel} implementation to override the sendResponse behavior to have handle of + * the {@link org.opensearch.action.bulk.BulkShardRequest} completion. + */ +public final class RTFPerformanceAnalyzerTransportChannel implements TransportChannel { + private static final Logger LOG = + LogManager.getLogger(RTFPerformanceAnalyzerTransportChannel.class); + + private static final ThreadMXBean threadMXBean = + (ThreadMXBean) ManagementFactory.getThreadMXBean(); + private static final String OPERATION_SHARD_BULK = "shardbulk"; + private static final String SHARD_ROLE_PRIMARY = "primary"; + private static final String SHARD_ROLE_REPLICA = "replica"; + + private long cpuStartTime; + private long operationStartTime; + + private Histogram cpuUtilizationHistogram; + + private TransportChannel original; + private String indexName; + private ShardId shardId; + private boolean primary; + + private long threadID; + private int numProcessors; + + void set( + TransportChannel original, + Histogram cpuUtilizationHistogram, + String indexName, + ShardId shardId, + boolean bPrimary) { + this.original = original; + this.cpuUtilizationHistogram = cpuUtilizationHistogram; + this.indexName = indexName; + this.shardId = shardId; + this.primary = bPrimary; + + this.operationStartTime = System.nanoTime(); + threadID = Thread.currentThread().getId(); + this.cpuStartTime = threadMXBean.getThreadCpuTime(threadID); + this.numProcessors = Runtime.getRuntime().availableProcessors(); + LOG.debug("Thread Name {}", Thread.currentThread().getName()); + } + + @Override + public String getProfileName() { + return "RTFPerformanceAnalyzerTransportChannelProfile"; + } + + @Override + public String getChannelType() { + return "RTFPerformanceAnalyzerTransportChannelType"; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + emitMetrics(null); + original.sendResponse(response); + } + + @Override + public void sendResponse(Exception exception) throws IOException { + emitMetrics(exception); + original.sendResponse(exception); + } + + private void emitMetrics(Exception exception) { + double cpuUtilization = calculateCPUUtilization(operationStartTime, cpuStartTime); + recordCPUUtilizationMetric( + shardId, cpuUtilization, OPERATION_SHARD_BULK, exception != null); + } + + private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) { + LOG.debug("Completion Thread Name {}", Thread.currentThread().getName()); + long totalCpuTime = + Math.max(0, (threadMXBean.getThreadCpuTime(threadID) - phaseCPUStartTime)); + return Utils.calculateCPUUtilization( + numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime); + } + + @VisibleForTesting + void recordCPUUtilizationMetric( + ShardId shardId, double cpuUtilization, String operation, boolean isFailed) { + cpuUtilizationHistogram.record( + cpuUtilization, + Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + shardId.getIndex().getName()) + .addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), + shardId.getIndex().getUUID()) + .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()) + .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) + .addTag( + RTFMetrics.CommonDimension.SHARD_ROLE.toString(), + primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA)); + } + + // This function is called from the security plugin using reflection. Do not + // remove this function without changing the security plugin. + public TransportChannel getInnerChannel() { + return this.original; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java new file mode 100644 index 00000000..5d2de2b6 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportInterceptor.java @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * Transport Interceptor to intercept the Indexing requests and populate the resource utilization + * metrics. + */ +public final class RTFPerformanceAnalyzerTransportInterceptor implements TransportInterceptor { + + private final PerformanceAnalyzerController controller; + + public RTFPerformanceAnalyzerTransportInterceptor( + final PerformanceAnalyzerController controller) { + this.controller = controller; + } + + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler) { + return new RTFPerformanceAnalyzerTransportRequestHandler<>(actualHandler, controller); + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java new file mode 100644 index 00000000..c1e5b55f --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.bulk.BulkShardRequest; +import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * {@link TransportRequestHandler} implementation to intercept only the {@link BulkShardRequest} and + * skip other transport calls. + * + * @param {@link TransportRequest} + */ +public final class RTFPerformanceAnalyzerTransportRequestHandler + implements TransportRequestHandler { + private static final Logger LOG = + LogManager.getLogger(RTFPerformanceAnalyzerTransportRequestHandler.class); + private final PerformanceAnalyzerController controller; + private TransportRequestHandler actualHandler; + boolean logOnce = false; + private final Histogram cpuUtilizationHistogram; + + RTFPerformanceAnalyzerTransportRequestHandler( + TransportRequestHandler actualHandler, PerformanceAnalyzerController controller) { + this.actualHandler = actualHandler; + this.controller = controller; + this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); + } + + private Histogram createCPUUtilizationHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), + "CPU Utilization per shard for an operation", + "rate"); + } else { + return null; + } + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + actualHandler.messageReceived(request, getChannel(request, channel, task), task); + } + + @VisibleForTesting + TransportChannel getChannel(T request, TransportChannel channel, Task task) { + if (!isCollectorEnabled()) { + return channel; + } + + if (request instanceof ConcreteShardRequest) { + return getShardBulkChannel(request, channel, task); + } else { + return channel; + } + } + + private boolean isCollectorEnabled() { + return OpenSearchResources.INSTANCE.getMetricsRegistry() != null + && controller.isPerformanceAnalyzerEnabled() + && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsSettingValue() + == Util.CollectorMode.TELEMETRY.getValue()); + } + + private TransportChannel getShardBulkChannel(T request, TransportChannel channel, Task task) { + String className = request.getClass().getName(); + boolean bPrimary = false; + + if (className.equals( + "org.opensearch.action.support.replication.TransportReplicationAction$ConcreteShardRequest")) { + bPrimary = true; + } else if (className.equals( + "org.opensearch.action.support.replication.TransportReplicationAction$ConcreteReplicaRequest")) { + bPrimary = false; + } else { + return channel; + } + + TransportRequest transportRequest = ((ConcreteShardRequest) request).getRequest(); + + if (!(transportRequest instanceof BulkShardRequest)) { + return channel; + } + + BulkShardRequest bsr = (BulkShardRequest) transportRequest; + RTFPerformanceAnalyzerTransportChannel rtfPerformanceAnalyzerTransportChannel = + new RTFPerformanceAnalyzerTransportChannel(); + + try { + rtfPerformanceAnalyzerTransportChannel.set( + channel, cpuUtilizationHistogram, bsr.index(), bsr.shardId(), bPrimary); + } catch (Exception ex) { + if (!logOnce) { + LOG.error(ex); + logOnce = true; + } + StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR); + } + + return rtfPerformanceAnalyzerTransportChannel; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index e4963ebc..aca1b295 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -8,6 +8,8 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.IndexShardStats; @@ -27,6 +29,7 @@ import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics; public class Utils { + private static final Logger LOG = LogManager.getLogger(Utils.class); public static void configureMetrics() { ServiceMetrics.initStatsReporter(); @@ -108,4 +111,16 @@ public static HashMap getShards() { IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED); + + public static double calculateCPUUtilization( + int numProcessors, long totalOperationTime, long cpuUsageTime) { + LOG.debug("totalCpuTime {}", cpuUsageTime); + LOG.debug("totalOperationTime {}", totalOperationTime); + LOG.debug("numProcessors {}", numProcessors); + if (totalOperationTime == 0l || cpuUsageTime == 0l || numProcessors == 0) { + return 0.0d; + } + double totalAvailableCPUTime = Double.valueOf(totalOperationTime * numProcessors); + return cpuUsageTime / totalAvailableCPUTime; + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java b/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java index fc5f95fa..554eca27 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java @@ -22,7 +22,6 @@ import org.opensearch.action.ActionRequest; import org.opensearch.action.support.ActionFilter; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -90,12 +89,7 @@ public void setup() { } catch (Exception e) { e.printStackTrace(); } - clusterService = - new ClusterService( - settings, - clusterSettings, - threadPool, - new ClusterManagerMetrics(metricsRegistry)); + clusterService = new ClusterService(settings, clusterSettings, threadPool); identityService = new IdentityService(Settings.EMPTY, List.of()); restController = new RestController( @@ -131,7 +125,7 @@ public void testGetActions() { @Test public void testGetTransportInterceptors() { List list = plugin.getTransportInterceptors(null, null); - assertEquals(1, list.size()); + assertEquals(2, list.size()); assertEquals(PerformanceAnalyzerTransportInterceptor.class, list.get(0).getClass()); } diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java index cfa8bf58..fdd3aa49 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java @@ -25,6 +25,7 @@ import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.TestUtil; import org.opensearch.performanceanalyzer.util.Utils; @@ -69,6 +70,21 @@ public void init() { TestUtil.readEvents(); } + @Test + public void tesSearchListener() { + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); + } + @Test public void testGetMetricsPath() { String expectedPath = diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java new file mode 100644 index 00000000..208b47c6 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -0,0 +1,163 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.listener; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.MockitoAnnotations.initMocks; + +import org.apache.commons.lang3.SystemUtils; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.core.action.NotifyOnceListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +public class RTFPerformanceAnalyzerSearchListenerTests { + + private RTFPerformanceAnalyzerSearchListener searchListener; + + @Mock private SearchContext searchContext; + @Mock private ShardSearchRequest shardSearchRequest; + @Mock private ShardId shardId; + @Mock private PerformanceAnalyzerController controller; + @Mock private SearchShardTask task; + @Mock private MetricsRegistry metricsRegistry; + @Mock private Histogram cpuUtilizationHistogram; + @Mock private Histogram heapUsedHistogram; + @Mock private Index index; + + @Mock private TaskResourceUsage taskResourceUsage; + + @BeforeClass + public static void setup() { + // this test only runs in Linux system + // as some of the static members of the ThreadList class are specific to Linux + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + } + + @Before + public void init() { + initMocks(this); + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.isPerformanceAnalyzerEnabled()).thenReturn(true); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("CPU_Utilization"), + Mockito.anyString(), + Mockito.eq("rate"))) + .thenReturn(cpuUtilizationHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("heap_used"), Mockito.anyString(), Mockito.eq("B"))) + .thenReturn(heapUsedHistogram); + searchListener = new RTFPerformanceAnalyzerSearchListener(controller); + assertEquals( + RTFPerformanceAnalyzerSearchListener.class.getSimpleName(), + searchListener.toString()); + } + + @Test + public void tesSearchListener() { + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + assertTrue( + searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + assertTrue( + searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener); + } + + @Test + public void testQueryPhase() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preQueryPhase(searchContext); + searchListener.queryPhase(searchContext, 0l); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testQueryPhaseFailed() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preQueryPhase(searchContext); + searchListener.failedQueryPhase(searchContext); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testFetchPhase() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preFetchPhase(searchContext); + searchListener.fetchPhase(searchContext, 0l); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testFetchPhaseFailed() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + searchListener.preFetchPhase(searchContext); + searchListener.failedFetchPhase(searchContext); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testTaskCompletionListener() { + initializeValidSearchContext(true); + RTFPerformanceAnalyzerSearchListener rtfSearchListener = + new RTFPerformanceAnalyzerSearchListener(controller); + + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + Mockito.when(task.getTotalResourceStats()).thenReturn(taskResourceUsage); + Mockito.when(taskResourceUsage.getCpuTimeInNanos()).thenReturn(10l); + + NotifyOnceListener taskCompletionListener = + rtfSearchListener.createListener(searchContext, 0l, "test", false); + taskCompletionListener.onResponse(task); + Mockito.verify(cpuUtilizationHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } + + private void initializeValidSearchContext(boolean isValid) { + if (isValid) { + Mockito.when(searchContext.request()).thenReturn(shardSearchRequest); + Mockito.when(searchContext.getTask()).thenReturn(task); + Mockito.when(shardSearchRequest.shardId()).thenReturn(shardId); + } else { + Mockito.when(searchContext.request()).thenReturn(null); + } + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java index 56dc289a..e14b778d 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java @@ -5,6 +5,7 @@ package org.opensearch.performanceanalyzer.transport; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; @@ -18,6 +19,7 @@ import org.opensearch.action.bulk.BulkShardRequest; import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.tasks.Task; @@ -67,4 +69,30 @@ public void testGetChannel() { TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); assertTrue(actualChannel instanceof PerformanceAnalyzerTransportChannel); } + + @Test + public void testGetChannelIfRCAModeIsDisabled() { + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertEquals(channel, actualChannel); + } + + @Test + public void testGetChannelIfDualModeIsEnabled() { + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertTrue(actualChannel instanceof PerformanceAnalyzerTransportChannel); + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java new file mode 100644 index 00000000..aa4e425b --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +import java.io.IOException; +import org.apache.commons.lang3.SystemUtils; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.tags.Tags; +import org.opensearch.transport.TransportChannel; + +public class RTFPerformanceAnalyzerTransportChannelTests { + private RTFPerformanceAnalyzerTransportChannel channel; + + @Mock private TransportChannel originalChannel; + @Mock private TransportResponse response; + @Mock private Histogram cpuUtilizationHistogram; + private ShardId shardId; + @Mock private ShardId mockedShardId; + @Mock private Index index; + + @Before + public void init() { + // this test only runs in Linux system + // as some of the static members of the ThreadList class are specific to Linux + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + Utils.configureMetrics(); + initMocks(this); + String indexName = "testIndex"; + shardId = new ShardId(new Index(indexName, "uuid"), 1); + channel = new RTFPerformanceAnalyzerTransportChannel(); + channel.set(originalChannel, cpuUtilizationHistogram, indexName, shardId, false); + assertEquals("RTFPerformanceAnalyzerTransportChannelProfile", channel.getProfileName()); + assertEquals("RTFPerformanceAnalyzerTransportChannelType", channel.getChannelType()); + assertEquals(originalChannel, channel.getInnerChannel()); + } + + @Test + public void testResponse() throws IOException { + channel.sendResponse(response); + verify(originalChannel).sendResponse(response); + verify(cpuUtilizationHistogram, times(1)).record(anyDouble(), any(Tags.class)); + } + + @Test + public void testResponseWithException() throws IOException { + Exception exception = new Exception("dummy exception"); + channel.sendResponse(exception); + verify(originalChannel).sendResponse(exception); + verify(cpuUtilizationHistogram, times(1)).record(anyDouble(), any(Tags.class)); + } + + @Test + public void testRecordCPUUtilizationMetric() { + RTFPerformanceAnalyzerTransportChannel channel = + new RTFPerformanceAnalyzerTransportChannel(); + channel.set(originalChannel, cpuUtilizationHistogram, "testIndex", mockedShardId, false); + Mockito.when(mockedShardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + channel.recordCPUUtilizationMetric(mockedShardId, 10l, "bulkShard", false); + Mockito.verify(cpuUtilizationHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java new file mode 100644 index 00000000..7c4ed412 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.transport; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +import org.apache.commons.lang3.SystemUtils; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.opensearch.action.bulk.BulkItemRequest; +import org.opensearch.action.bulk.BulkShardRequest; +import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +@SuppressWarnings("unchecked") +public class RTFPerformanceAnalyzerTransportRequestHandlerTests { + private RTFPerformanceAnalyzerTransportRequestHandler handler; + private ConcreteShardRequest concreteShardRequest; + + @Mock private TransportRequestHandler transportRequestHandler; + @Mock private PerformanceAnalyzerController controller; + @Mock private TransportChannel channel; + @Mock private TransportRequest request; + @Mock private BulkShardRequest bulkShardRequest; + @Mock private Task task; + @Mock private ShardId shardId; + @Mock private MetricsRegistry metricsRegistry; + + @Before + public void init() { + // this test only runs in Linux system + // as some of the static members of the ThreadList class are specific to Linux + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + Utils.configureMetrics(); + initMocks(this); + handler = + new RTFPerformanceAnalyzerTransportRequestHandler( + transportRequestHandler, controller); + Mockito.when(controller.isPerformanceAnalyzerEnabled()).thenReturn(true); + } + + @Test + public void testMessageReceived() throws Exception { + handler.messageReceived(request, channel, task); + verify(transportRequestHandler).messageReceived(request, channel, task); + } + + @Test + public void testGetChannel() { + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertTrue(actualChannel instanceof RTFPerformanceAnalyzerTransportChannel); + } + + @Test + public void testGetChannelTelemetryIsDisabled() { + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertEquals(channel, actualChannel); + } + + @Test + public void testGetChannelDualMode() { + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertTrue(actualChannel instanceof RTFPerformanceAnalyzerTransportChannel); + } + + @Test + public void testGetChannelMetricRegistryIsNull() { + OpenSearchResources.INSTANCE.setMetricsRegistry(null); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); + handler.getChannel(concreteShardRequest, channel, task); + + Mockito.when(bulkShardRequest.shardId()).thenReturn(shardId); + Mockito.when(bulkShardRequest.items()).thenReturn(new BulkItemRequest[1]); + TransportChannel actualChannel = handler.getChannel(concreteShardRequest, channel, task); + assertEquals(channel, actualChannel); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java b/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java new file mode 100644 index 00000000..2ee91987 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.util; + +import org.junit.Assert; +import org.junit.Test; + +public class UtilsTests { + + @Test + public void testCPUUtilization() { + Assert.assertEquals(0.5, Utils.calculateCPUUtilization(2, 5, 5), 0.0); + Assert.assertEquals(1.0, Utils.calculateCPUUtilization(1, 5, 5), 0.0); + Assert.assertEquals( + Double.valueOf(10 / 15.0), Utils.calculateCPUUtilization(3, 5, 10), 0.0); + } + + @Test + public void testCPUUtilizationZeroValue() { + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 5, 0), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 0, 5), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5), 0.0); + } +}