From 34ebcc44fd24190e3822b325620e3ce4bba62ab8 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Tue, 23 Jul 2024 19:41:33 +0530 Subject: [PATCH] Addresses the review comments Signed-off-by: Gagan Juneja --- .../config/PerformanceAnalyzerController.java | 2 +- .../PerformanceAnalyzerSearchListener.java | 4 +-- .../RTFPerformanceAnalyzerSearchListener.java | 32 ++++++++----------- ...rmanceAnalyzerTransportRequestHandler.java | 4 +-- ...rmanceAnalyzerTransportRequestHandler.java | 8 ++--- .../performanceanalyzer/util/Utils.java | 29 ++++++++++++++--- ...erformanceAnalyzerSearchListenerTests.java | 6 ++-- ...erformanceAnalyzerSearchListenerTests.java | 14 ++++---- ...eAnalyzerTransportRequestHandlerTests.java | 4 +-- ...eAnalyzerTransportRequestHandlerTests.java | 8 ++--- 10 files changed, 63 insertions(+), 48 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java index 74cc3e26..6915e0fe 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java +++ b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java @@ -389,7 +389,7 @@ public boolean isCollectorDisabled( * * @return collectorsSettingValue */ - public int getCollectorsSettingValue() { + public int getCollectorsRunModeValue() { return collectorsSettingValue; } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java index f07415f8..99d46e3d 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java @@ -45,8 +45,8 @@ SearchListener getSearchListener() { private boolean isSearchListenerEnabled() { return controller.isPerformanceAnalyzerEnabled() - && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() - || controller.getCollectorsSettingValue() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() == Util.CollectorMode.RCA.getValue()); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 123d994c..a636444b 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -51,14 +51,15 @@ public class RTFPerformanceAnalyzerSearchListener public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) { this.controller = controller; - this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); - heapUsedHistogram = createHeapUsedHistogram(); + this.cpuUtilizationHistogram = + createCPUUtilizationHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.heapUsedHistogram = + createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); this.threadLocal = ThreadLocal.withInitial(() -> new HashMap()); this.numProcessors = Runtime.getRuntime().availableProcessors(); } - private Histogram createCPUUtilizationHistogram() { - MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + private Histogram createCPUUtilizationHistogram(MetricsRegistry metricsRegistry) { if (metricsRegistry != null) { return metricsRegistry.createHistogram( RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), @@ -70,8 +71,7 @@ private Histogram createCPUUtilizationHistogram() { } } - private Histogram createHeapUsedHistogram() { - MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) { if (metricsRegistry != null) { return metricsRegistry.createHistogram( RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(), @@ -94,14 +94,10 @@ SearchListener getSearchListener() { } private boolean isSearchListenerEnabled() { - LOG.debug( - "Controller enable status {}, CollectorMode value {}", - controller.isPerformanceAnalyzerEnabled(), - controller.getCollectorsSettingValue()); return OpenSearchResources.INSTANCE.getMetricsRegistry() != null && controller.isPerformanceAnalyzerEnabled() - && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() - || controller.getCollectorsSettingValue() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() == Util.CollectorMode.TELEMETRY.getValue()); } @@ -173,7 +169,7 @@ public void preQueryPhase(SearchContext searchContext) { @Override public void queryPhase(SearchContext searchContext, long tookInNanos) { - long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); long queryTime = (System.nanoTime() - queryStartTime); addResourceTrackingCompletionListener( searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, false); @@ -181,7 +177,7 @@ public void queryPhase(SearchContext searchContext, long tookInNanos) { @Override public void failedQueryPhase(SearchContext searchContext) { - long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); long queryTime = (System.nanoTime() - queryStartTime); addResourceTrackingCompletionListener( searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, true); @@ -194,7 +190,7 @@ public void preFetchPhase(SearchContext searchContext) { @Override public void fetchPhase(SearchContext searchContext, long tookInNanos) { - long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); + long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); long fetchTime = (System.nanoTime() - fetchStartTime); addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, false); @@ -202,7 +198,7 @@ public void fetchPhase(SearchContext searchContext, long tookInNanos) { @Override public void failedFetchPhase(SearchContext searchContext) { - long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); + long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); long fetchTime = (System.nanoTime() - fetchStartTime); addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, true); @@ -226,12 +222,12 @@ private void addResourceTrackingCompletionListenerForFetchPhase( long overallStartTime = fetchStartTime; long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l); /** - * There are scenarios where both query and fetch pahses run in the same task for an + * There are scenarios where both query and fetch phases run in the same task for an * optimization. Adding a special handling for that case to divide the CPU usage between * these 2 operations by their runTime. */ if (queryTaskId == searchContext.getTask().getId()) { - overallStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + overallStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); } addCompletionListener(searchContext, overallStartTime, fetchTime, operation, isFailed); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java index ad517c74..64052ab1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java @@ -59,8 +59,8 @@ TransportChannel getChannel(T request, TransportChannel channel, Task task) { private boolean isCollectorEnabled() { return controller.isPerformanceAnalyzerEnabled() - && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() - || controller.getCollectorsSettingValue() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() == Util.CollectorMode.RCA.getValue()); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java index c1e5b55f..82a0abe6 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java @@ -36,7 +36,7 @@ public final class RTFPerformanceAnalyzerTransportRequestHandler actualHandler; - boolean logOnce = false; + private boolean logOnce = false; private final Histogram cpuUtilizationHistogram; RTFPerformanceAnalyzerTransportRequestHandler( @@ -52,7 +52,7 @@ private Histogram createCPUUtilizationHistogram() { return metricsRegistry.createHistogram( RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(), "CPU Utilization per shard for an operation", - "rate"); + RTFMetrics.MetricUnits.RATE.toString()); } else { return null; } @@ -79,8 +79,8 @@ TransportChannel getChannel(T request, TransportChannel channel, Task task) { private boolean isCollectorEnabled() { return OpenSearchResources.INSTANCE.getMetricsRegistry() != null && controller.isPerformanceAnalyzerEnabled() - && (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue() - || controller.getCollectorsSettingValue() + && (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue() + || controller.getCollectorsRunModeValue() == Util.CollectorMode.TELEMETRY.getValue()); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index 97eeb460..ebf440eb 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -112,18 +112,37 @@ public static HashMap getShards() { IndexShardState.POST_RECOVERY, IndexShardState.STARTED); + /** + * CPU Utilization is the time spend in CPU cycles divide by the total time cpu available time. + * Total cpu available time would be the multiplication of num of processors and the process + * time. It also takes into account the cpuShareFactor in case some adjustments are needed. + * + * @param numProcessors + * @param totalOperationTime + * @param cpuUsageTime + * @param cpuShareFactor + * @return + */ public static double calculateCPUUtilization( int numProcessors, long totalOperationTime, long cpuUsageTime, double cpuShareFactor) { - LOG.debug("CPUUtilization calculation - numProcessors {}", numProcessors); - LOG.debug("CPUUtilization calculation - cpuShareFactor {}", cpuShareFactor); - LOG.debug("CPUUtilization calculation - totalCpuTime {}", cpuUsageTime); - LOG.debug("CPUUtilization calculation - totalOperationTime {}", totalOperationTime); + LOG.debug( + "Performance Analyzer CPUUtilization calculation with numProcessors: {}", + numProcessors); + LOG.debug( + "Performance Analyzer CPUUtilization calculation with cpuShareFactor {}", + cpuShareFactor); + LOG.debug( + "Performance Analyzer CPUUtilization calculation with totalCpuTime {}", + cpuUsageTime); + LOG.debug( + "Performance Analyzer CPUUtilization calculation with totalOperationTime {}", + totalOperationTime); if (totalOperationTime == 0l || cpuUsageTime == 0l || numProcessors == 0) { return 0.0d; } double totalAvailableCPUTime = Double.valueOf(totalOperationTime * numProcessors); double cpuUtil = cpuShareFactor * (cpuUsageTime / totalAvailableCPUTime); - LOG.debug("CPUUtilization calculation - cpuUtil {}", cpuUtil); + LOG.debug("Performance Analyzer CPUUtilization calculation with cpuUtil {}", cpuUtil); return cpuUtil; } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java index fdd3aa49..f7204783 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java @@ -72,15 +72,15 @@ public void init() { @Test public void tesSearchListener() { - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.RCA.getValue()); assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.DUAL.getValue()); assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); } diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index 0c19492a..16aba4bc 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -76,16 +76,16 @@ public void init() { @Test public void tesSearchListener() { - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.RCA.getValue()); assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); assertTrue( searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.DUAL.getValue()); assertTrue( searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener); @@ -94,7 +94,7 @@ public void tesSearchListener() { @Test public void testQueryPhase() { initializeValidSearchContext(true); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); searchListener.preQueryPhase(searchContext); searchListener.queryPhase(searchContext, 0l); @@ -104,7 +104,7 @@ public void testQueryPhase() { @Test public void testQueryPhaseFailed() { initializeValidSearchContext(true); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); searchListener.preQueryPhase(searchContext); searchListener.failedQueryPhase(searchContext); @@ -114,7 +114,7 @@ public void testQueryPhaseFailed() { @Test public void testFetchPhase() { initializeValidSearchContext(true); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); searchListener.preFetchPhase(searchContext); searchListener.fetchPhase(searchContext, 0l); @@ -124,7 +124,7 @@ public void testFetchPhase() { @Test public void testFetchPhaseFailed() { initializeValidSearchContext(true); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); searchListener.preFetchPhase(searchContext); searchListener.failedFetchPhase(searchContext); diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java index e14b778d..099390fa 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandlerTests.java @@ -72,7 +72,7 @@ public void testGetChannel() { @Test public void testGetChannelIfRCAModeIsDisabled() { - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task); @@ -85,7 +85,7 @@ public void testGetChannelIfRCAModeIsDisabled() { @Test public void testGetChannelIfDualModeIsEnabled() { - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.DUAL.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task); diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java index 7c4ed412..9a0f9d0e 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java @@ -65,7 +65,7 @@ public void testMessageReceived() throws Exception { @Test public void testGetChannel() { OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task); @@ -79,7 +79,7 @@ public void testGetChannel() { @Test public void testGetChannelTelemetryIsDisabled() { OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.RCA.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task); @@ -93,7 +93,7 @@ public void testGetChannelTelemetryIsDisabled() { @Test public void testGetChannelDualMode() { OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.DUAL.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task); @@ -107,7 +107,7 @@ public void testGetChannelDualMode() { @Test public void testGetChannelMetricRegistryIsNull() { OpenSearchResources.INSTANCE.setMetricsRegistry(null); - Mockito.when(controller.getCollectorsSettingValue()) + Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.RCA.getValue()); concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1); handler.getChannel(concreteShardRequest, channel, task);