From ff9d0872afbe3087dc2653113045b72ca5cafabb Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Wed, 17 Jul 2024 16:04:53 +0530 Subject: [PATCH] test changes Signed-off-by: Gagan Juneja --- .../PerformanceAnalyzerSearchListener.java | 4 +- .../RTFPerformanceAnalyzerSearchListener.java | 73 +++---- ...TFPerformanceAnalyzerTransportChannel.java | 4 +- ...erformanceAnalyzerSearchListenerTests.java | 16 ++ ...erformanceAnalyzerSearchListenerTests.java | 181 ++++++++++++++++++ ...formanceAnalyzerTransportChannelTests.java | 19 +- ...eAnalyzerTransportRequestHandlerTests.java | 3 +- 7 files changed, 252 insertions(+), 48 deletions(-) create mode 100644 src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java index fb26e73e..f07415f8 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java @@ -7,6 +7,7 @@ import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; +import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.index.shard.SearchOperationListener; @@ -37,7 +38,8 @@ public String toString() { return PerformanceAnalyzerSearchListener.class.getSimpleName(); } - private SearchListener getSearchListener() { + @VisibleForTesting + SearchListener getSearchListener() { return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER; } diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 257e078d..c2ddc051 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -7,8 +7,8 @@ import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; -import com.sun.management.ThreadMXBean; -import java.lang.management.ManagementFactory; +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,14 +34,11 @@ public class RTFPerformanceAnalyzerSearchListener LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class); private static final String OPERATION_SHARD_FETCH = "shard_fetch"; private static final String OPERATION_SHARD_QUERY = "shard_query"; - public static final String QUERY_CPU_START_TIME = "query_cpu"; public static final String QUERY_START_TIME = "query_start_time"; - public static final String FETCH_CPU_START_TIME = "fetch_cpu"; public static final String FETCH_START_TIME = "fetch_start_time"; - private final ThreadLocal> threadLocal = new ThreadLocal<>(); + private final ThreadLocal> threadLocal; private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener(); - private static final ThreadMXBean threadMXBean = - (ThreadMXBean) ManagementFactory.getThreadMXBean(); + private final long scClkTck; private final PerformanceAnalyzerController controller; private Histogram cpuUtilizationHistogram; @@ -50,6 +47,7 @@ public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController this.controller = controller; this.scClkTck = OSGlobals.getScClkTck(); this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); + this.threadLocal = ThreadLocal.withInitial(() -> new HashMap()); } private Histogram createCPUUtilizationHistogram() { @@ -69,7 +67,8 @@ public String toString() { return RTFPerformanceAnalyzerSearchListener.class.getSimpleName(); } - private SearchListener getSearchListener() { + @VisibleForTesting + SearchListener getSearchListener() { return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER; } @@ -81,7 +80,6 @@ private boolean isSearchListenerEnabled() { == Util.CollectorMode.TELEMETRY.getValue()); } - @Override public void onPreQueryPhase(SearchContext searchContext) { try { @@ -151,7 +149,7 @@ public void preQueryPhase(SearchContext searchContext) { public void queryPhase(SearchContext searchContext, long tookInNanos) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); addCPUResourceTrackingCompletionListener( - searchContext, queryStartTime, OPERATION_SHARD_FETCH, false); + searchContext, queryStartTime, OPERATION_SHARD_QUERY, false); } @Override @@ -180,32 +178,21 @@ public void failedFetchPhase(SearchContext searchContext) { searchContext, fetchStartTime, OPERATION_SHARD_FETCH, true); } - private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) { - long totalCpuTime = - Math.min(0, (threadMXBean.getCurrentThreadCpuTime() - phaseCPUStartTime)); - return Utils.calculateCPUUtilization( - totalCpuTime, scClkTck, phaseStartTime - System.nanoTime()); - } - - private void recordCPUUtilizationMetric( - SearchContext searchContext, double cpuUtilization, String operation) { - cpuUtilizationHistogram.record( - cpuUtilization, - Tags.create() - .addTag( - RTFMetrics.CommonDimension.SHARD_ID.toString(), - searchContext.shardTarget().getShardId().getId()) - .addTag( - RTFMetrics.CommonDimension.INDEX_NAME.toString(), - searchContext.shardTarget().getShardId().getIndex().getName()) - .addTag( - RTFMetrics.CommonDimension.INDEX_UUID.toString(), - searchContext.shardTarget().getShardId().getIndex().getUUID()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation)); - } - - private NotifyOnceListener addCPUResourceTrackingCompletionListener( + private void addCPUResourceTrackingCompletionListener( SearchContext searchContext, long startTime, String operation, boolean isFailed) { + searchContext + .getTask() + .addResourceTrackingCompletionListener( + createListener( + searchContext, + (System.nanoTime() - startTime), + operation, + isFailed)); + } + + @VisibleForTesting + NotifyOnceListener createListener( + SearchContext searchContext, long totalTime, String operation, boolean isFailed) { return new NotifyOnceListener() { @Override protected void innerOnResponse(Task task) { @@ -214,27 +201,25 @@ protected void innerOnResponse(Task task) { Utils.calculateCPUUtilization( task.getTotalResourceStats().getCpuTimeInNanos(), scClkTck, - (System.nanoTime() - startTime)), + totalTime), Tags.create() .addTag( RTFMetrics.CommonDimension.INDEX_NAME.toString(), - searchContext.shardTarget().getShardId().getIndexName()) + searchContext.request().shardId().getIndex().getName()) .addTag( RTFMetrics.CommonDimension.INDEX_UUID.toString(), - searchContext - .shardTarget() - .getShardId() - .getIndex() - .getUUID()) + searchContext.request().shardId().getIndex().getUUID()) .addTag( RTFMetrics.CommonDimension.SHARD_ID.toString(), - searchContext.shardTarget().getShardId().getId()) + searchContext.request().shardId().getId()) .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed)); } @Override - protected void innerOnFailure(Exception e) {} + protected void innerOnFailure(Exception e) { + LOG.error("Error is executing the the listener", e); + } }; } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java index b784f9a3..e1c5ab43 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -5,6 +5,7 @@ package org.opensearch.performanceanalyzer.transport; +import com.google.common.annotations.VisibleForTesting; import com.sun.management.ThreadMXBean; import java.io.IOException; import java.lang.management.ManagementFactory; @@ -93,7 +94,8 @@ private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTi totalCpuTime, scClkTck, phaseStartTime - System.nanoTime()); } - private void recordCPUUtilizationMetric( + @VisibleForTesting + void recordCPUUtilizationMetric( ShardId shardId, double cpuUtilization, String operation, boolean isFailed) { cpuUtilizationHistogram.record( cpuUtilization, diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java index cfa8bf58..fdd3aa49 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListenerTests.java @@ -25,6 +25,7 @@ import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.TestUtil; import org.opensearch.performanceanalyzer.util.Utils; @@ -69,6 +70,21 @@ public void init() { TestUtil.readEvents(); } + @Test + public void tesSearchListener() { + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); + } + @Test public void testGetMetricsPath() { String expectedPath = diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java new file mode 100644 index 00000000..ae7b496b --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -0,0 +1,181 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.listener; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.MockitoAnnotations.initMocks; + +import org.apache.commons.lang3.SystemUtils; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.core.action.NotifyOnceListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.util.Util; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +public class RTFPerformanceAnalyzerSearchListenerTests { + private static final long TOOK_IN_NANOS = 10; + private static final String EXCEPTION = + StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR.toString(); + + private RTFPerformanceAnalyzerSearchListener searchListener; + + @Mock private SearchContext searchContext; + @Mock private ShardSearchRequest shardSearchRequest; + @Mock private ShardId shardId; + @Mock private PerformanceAnalyzerController controller; + @Mock private SearchShardTask task; + @Mock private MetricsRegistry metricsRegistry; + @Mock private Histogram histogram; + @Mock private Index index; + + @Mock private TaskResourceUsage taskResourceUsage; + + @BeforeClass + public static void setup() { + // this test only runs in Linux system + // as some of the static members of the ThreadList class are specific to Linux + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + } + + @Before + public void init() { + initMocks(this); + Mockito.when(controller.isPerformanceAnalyzerEnabled()).thenReturn(true); + searchListener = new RTFPerformanceAnalyzerSearchListener(controller); + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + assertEquals( + RTFPerformanceAnalyzerSearchListener.class.getSimpleName(), + searchListener.toString()); + } + + @Test + public void tesSearchListener() { + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.RCA.getValue()); + assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); + + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.DUAL.getValue()); + assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener); + } + + @Test + public void testQueryPhase() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("CPU_Utilization"), + Mockito.anyString(), + Mockito.eq("rate"))) + .thenReturn(histogram); + searchListener.preQueryPhase(searchContext); + searchListener.queryPhase(searchContext, 0l); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testQueryPhaseFailed() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("CPU_Utilization"), + Mockito.anyString(), + Mockito.eq("rate"))) + .thenReturn(histogram); + searchListener.preQueryPhase(searchContext); + searchListener.failedQueryPhase(searchContext); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testFetchPhase() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("CPU_Utilization"), + Mockito.anyString(), + Mockito.eq("rate"))) + .thenReturn(histogram); + searchListener.preFetchPhase(searchContext); + searchListener.fetchPhase(searchContext, 0l); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testFetchPhaseFailed() { + initializeValidSearchContext(true); + Mockito.when(controller.getCollectorsSettingValue()) + .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("CPU_Utilization"), + Mockito.anyString(), + Mockito.eq("rate"))) + .thenReturn(histogram); + searchListener.preFetchPhase(searchContext); + searchListener.failedFetchPhase(searchContext); + Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + } + + @Test + public void testTaskCompletionListener() { + initializeValidSearchContext(true); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq("CPU_Utilization"), + Mockito.anyString(), + Mockito.eq("rate"))) + .thenReturn(histogram); + RTFPerformanceAnalyzerSearchListener rtfSearchListener = + new RTFPerformanceAnalyzerSearchListener(controller); + + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + Mockito.when(task.getTotalResourceStats()).thenReturn(taskResourceUsage); + Mockito.when(taskResourceUsage.getCpuTimeInNanos()).thenReturn(10l); + + NotifyOnceListener taskCompletionListener = + rtfSearchListener.createListener(searchContext, 0l, "test", false); + taskCompletionListener.onResponse(task); + Mockito.verify(histogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } + + private void initializeValidSearchContext(boolean isValid) { + if (isValid) { + Mockito.when(searchContext.request()).thenReturn(shardSearchRequest); + Mockito.when(shardSearchRequest.shardId()).thenReturn(shardId); + } else { + Mockito.when(searchContext.request()).thenReturn(null); + } + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java index 966eacce..aa4e425b 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java @@ -13,9 +13,11 @@ import static org.mockito.MockitoAnnotations.initMocks; import java.io.IOException; +import org.apache.commons.lang3.SystemUtils; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; +import org.mockito.Mockito; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.transport.TransportResponse; @@ -31,12 +33,14 @@ public class RTFPerformanceAnalyzerTransportChannelTests { @Mock private TransportResponse response; @Mock private Histogram cpuUtilizationHistogram; private ShardId shardId; + @Mock private ShardId mockedShardId; + @Mock private Index index; @Before public void init() { // this test only runs in Linux system // as some of the static members of the ThreadList class are specific to Linux - // org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); Utils.configureMetrics(); initMocks(this); String indexName = "testIndex"; @@ -62,4 +66,17 @@ public void testResponseWithException() throws IOException { verify(originalChannel).sendResponse(exception); verify(cpuUtilizationHistogram, times(1)).record(anyDouble(), any(Tags.class)); } + + @Test + public void testRecordCPUUtilizationMetric() { + RTFPerformanceAnalyzerTransportChannel channel = + new RTFPerformanceAnalyzerTransportChannel(); + channel.set(originalChannel, cpuUtilizationHistogram, "testIndex", mockedShardId, false); + Mockito.when(mockedShardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + channel.recordCPUUtilizationMetric(mockedShardId, 10l, "bulkShard", false); + Mockito.verify(cpuUtilizationHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java index e838e95f..7c4ed412 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandlerTests.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.MockitoAnnotations.initMocks; +import org.apache.commons.lang3.SystemUtils; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -46,7 +47,7 @@ public class RTFPerformanceAnalyzerTransportRequestHandlerTests { public void init() { // this test only runs in Linux system // as some of the static members of the ThreadList class are specific to Linux - // org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); + org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX); Utils.configureMetrics(); initMocks(this); handler =