Skip to content

Commit

Permalink
test changes
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Jul 17, 2024
1 parent 6a2f94d commit ff9d087
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.shard.SearchOperationListener;
Expand Down Expand Up @@ -37,7 +38,8 @@ public String toString() {
return PerformanceAnalyzerSearchListener.class.getSimpleName();
}

private SearchListener getSearchListener() {
@VisibleForTesting
SearchListener getSearchListener() {
return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR;

import com.sun.management.ThreadMXBean;
import java.lang.management.ManagementFactory;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -34,14 +34,11 @@ public class RTFPerformanceAnalyzerSearchListener
LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class);
private static final String OPERATION_SHARD_FETCH = "shard_fetch";
private static final String OPERATION_SHARD_QUERY = "shard_query";
public static final String QUERY_CPU_START_TIME = "query_cpu";
public static final String QUERY_START_TIME = "query_start_time";
public static final String FETCH_CPU_START_TIME = "fetch_cpu";
public static final String FETCH_START_TIME = "fetch_start_time";
private final ThreadLocal<Map<String, Long>> threadLocal = new ThreadLocal<>();
private final ThreadLocal<Map<String, Long>> threadLocal;
private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener();
private static final ThreadMXBean threadMXBean =
(ThreadMXBean) ManagementFactory.getThreadMXBean();

private final long scClkTck;
private final PerformanceAnalyzerController controller;
private Histogram cpuUtilizationHistogram;
Expand All @@ -50,6 +47,7 @@ public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController
this.controller = controller;
this.scClkTck = OSGlobals.getScClkTck();
this.cpuUtilizationHistogram = createCPUUtilizationHistogram();
this.threadLocal = ThreadLocal.withInitial(() -> new HashMap<String, Long>());
}

private Histogram createCPUUtilizationHistogram() {
Expand All @@ -69,7 +67,8 @@ public String toString() {
return RTFPerformanceAnalyzerSearchListener.class.getSimpleName();
}

private SearchListener getSearchListener() {
@VisibleForTesting
SearchListener getSearchListener() {
return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER;
}

Expand All @@ -81,7 +80,6 @@ private boolean isSearchListenerEnabled() {
== Util.CollectorMode.TELEMETRY.getValue());
}


@Override
public void onPreQueryPhase(SearchContext searchContext) {
try {
Expand Down Expand Up @@ -151,7 +149,7 @@ public void preQueryPhase(SearchContext searchContext) {
public void queryPhase(SearchContext searchContext, long tookInNanos) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
addCPUResourceTrackingCompletionListener(
searchContext, queryStartTime, OPERATION_SHARD_FETCH, false);
searchContext, queryStartTime, OPERATION_SHARD_QUERY, false);
}

@Override
Expand Down Expand Up @@ -180,32 +178,21 @@ public void failedFetchPhase(SearchContext searchContext) {
searchContext, fetchStartTime, OPERATION_SHARD_FETCH, true);
}

private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) {
long totalCpuTime =
Math.min(0, (threadMXBean.getCurrentThreadCpuTime() - phaseCPUStartTime));
return Utils.calculateCPUUtilization(
totalCpuTime, scClkTck, phaseStartTime - System.nanoTime());
}

private void recordCPUUtilizationMetric(
SearchContext searchContext, double cpuUtilization, String operation) {
cpuUtilizationHistogram.record(
cpuUtilization,
Tags.create()
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.shardTarget().getShardId().getId())
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.shardTarget().getShardId().getIndex().getName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext.shardTarget().getShardId().getIndex().getUUID())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation));
}

private NotifyOnceListener<Task> addCPUResourceTrackingCompletionListener(
private void addCPUResourceTrackingCompletionListener(
SearchContext searchContext, long startTime, String operation, boolean isFailed) {
searchContext
.getTask()
.addResourceTrackingCompletionListener(
createListener(
searchContext,
(System.nanoTime() - startTime),
operation,
isFailed));
}

@VisibleForTesting
NotifyOnceListener<Task> createListener(
SearchContext searchContext, long totalTime, String operation, boolean isFailed) {
return new NotifyOnceListener<Task>() {
@Override
protected void innerOnResponse(Task task) {
Expand All @@ -214,27 +201,25 @@ protected void innerOnResponse(Task task) {
Utils.calculateCPUUtilization(
task.getTotalResourceStats().getCpuTimeInNanos(),
scClkTck,
(System.nanoTime() - startTime)),
totalTime),
Tags.create()
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.shardTarget().getShardId().getIndexName())
searchContext.request().shardId().getIndex().getName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext
.shardTarget()
.getShardId()
.getIndex()
.getUUID())
searchContext.request().shardId().getIndex().getUUID())
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.shardTarget().getShardId().getId())
searchContext.request().shardId().getId())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation)
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed));
}

@Override
protected void innerOnFailure(Exception e) {}
protected void innerOnFailure(Exception e) {
LOG.error("Error is executing the the listener", e);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.performanceanalyzer.transport;

import com.google.common.annotations.VisibleForTesting;
import com.sun.management.ThreadMXBean;
import java.io.IOException;
import java.lang.management.ManagementFactory;
Expand Down Expand Up @@ -93,7 +94,8 @@ private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTi
totalCpuTime, scClkTck, phaseStartTime - System.nanoTime());
}

private void recordCPUUtilizationMetric(
@VisibleForTesting
void recordCPUUtilizationMetric(
ShardId shardId, double cpuUtilization, String operation, boolean isFailed) {
cpuUtilizationHistogram.record(
cpuUtilization,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.util.TestUtil;
import org.opensearch.performanceanalyzer.util.Utils;
Expand Down Expand Up @@ -69,6 +70,21 @@ public void init() {
TestUtil.readEvents();
}

@Test
public void tesSearchListener() {
Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener);

Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.RCA.getValue());
assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener);

Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.DUAL.getValue());
assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener);
}

@Test
public void testGetMetricsPath() {
String expectedPath =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.listener;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.MockitoAnnotations.initMocks;

import org.apache.commons.lang3.SystemUtils;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

public class RTFPerformanceAnalyzerSearchListenerTests {
private static final long TOOK_IN_NANOS = 10;
private static final String EXCEPTION =
StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR.toString();

private RTFPerformanceAnalyzerSearchListener searchListener;

@Mock private SearchContext searchContext;
@Mock private ShardSearchRequest shardSearchRequest;
@Mock private ShardId shardId;
@Mock private PerformanceAnalyzerController controller;
@Mock private SearchShardTask task;
@Mock private MetricsRegistry metricsRegistry;
@Mock private Histogram histogram;
@Mock private Index index;

@Mock private TaskResourceUsage taskResourceUsage;

@BeforeClass
public static void setup() {
// this test only runs in Linux system
// as some of the static members of the ThreadList class are specific to Linux
org.junit.Assume.assumeTrue(SystemUtils.IS_OS_LINUX);
}

@Before
public void init() {
initMocks(this);
Mockito.when(controller.isPerformanceAnalyzerEnabled()).thenReturn(true);
searchListener = new RTFPerformanceAnalyzerSearchListener(controller);
OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry);
assertEquals(
RTFPerformanceAnalyzerSearchListener.class.getSimpleName(),
searchListener.toString());
}

@Test
public void tesSearchListener() {
Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener);

Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.RCA.getValue());
assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener);

Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.DUAL.getValue());
assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener);
}

@Test
public void testQueryPhase() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(
metricsRegistry.createHistogram(
Mockito.eq("CPU_Utilization"),
Mockito.anyString(),
Mockito.eq("rate")))
.thenReturn(histogram);
searchListener.preQueryPhase(searchContext);
searchListener.queryPhase(searchContext, 0l);
Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any());
}

@Test
public void testQueryPhaseFailed() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(
metricsRegistry.createHistogram(
Mockito.eq("CPU_Utilization"),
Mockito.anyString(),
Mockito.eq("rate")))
.thenReturn(histogram);
searchListener.preQueryPhase(searchContext);
searchListener.failedQueryPhase(searchContext);
Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any());
}

@Test
public void testFetchPhase() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(
metricsRegistry.createHistogram(
Mockito.eq("CPU_Utilization"),
Mockito.anyString(),
Mockito.eq("rate")))
.thenReturn(histogram);
searchListener.preFetchPhase(searchContext);
searchListener.fetchPhase(searchContext, 0l);
Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any());
}

@Test
public void testFetchPhaseFailed() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(
metricsRegistry.createHistogram(
Mockito.eq("CPU_Utilization"),
Mockito.anyString(),
Mockito.eq("rate")))
.thenReturn(histogram);
searchListener.preFetchPhase(searchContext);
searchListener.failedFetchPhase(searchContext);
Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any());
}

@Test
public void testTaskCompletionListener() {
initializeValidSearchContext(true);
Mockito.when(
metricsRegistry.createHistogram(
Mockito.eq("CPU_Utilization"),
Mockito.anyString(),
Mockito.eq("rate")))
.thenReturn(histogram);
RTFPerformanceAnalyzerSearchListener rtfSearchListener =
new RTFPerformanceAnalyzerSearchListener(controller);

Mockito.when(shardId.getIndex()).thenReturn(index);
Mockito.when(index.getName()).thenReturn("myTestIndex");
Mockito.when(index.getUUID()).thenReturn("abc-def");
Mockito.when(task.getTotalResourceStats()).thenReturn(taskResourceUsage);
Mockito.when(taskResourceUsage.getCpuTimeInNanos()).thenReturn(10l);

NotifyOnceListener<Task> taskCompletionListener =
rtfSearchListener.createListener(searchContext, 0l, "test", false);
taskCompletionListener.onResponse(task);
Mockito.verify(histogram).record(Mockito.anyDouble(), Mockito.any(Tags.class));
}

private void initializeValidSearchContext(boolean isValid) {
if (isValid) {
Mockito.when(searchContext.request()).thenReturn(shardSearchRequest);
Mockito.when(shardSearchRequest.shardId()).thenReturn(shardId);
} else {
Mockito.when(searchContext.request()).thenReturn(null);
}
}
}
Loading

0 comments on commit ff9d087

Please sign in to comment.