Skip to content

Commit

Permalink
test changes
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Jul 19, 2024
1 parent 1a3b223 commit 7b2e1e8
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
import org.opensearch.performanceanalyzer.commons.os.OSGlobals;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.util.Utils;
Expand All @@ -39,15 +38,17 @@ public class RTFPerformanceAnalyzerSearchListener
private final ThreadLocal<Map<String, Long>> threadLocal;
private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener();

private final long scClkTck;
private final PerformanceAnalyzerController controller;
private Histogram cpuUtilizationHistogram;
private final Histogram cpuUtilizationHistogram;
private final Histogram heapUsedHistogram;
private final int numProcessors;

public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) {
this.controller = controller;
this.scClkTck = OSGlobals.getScClkTck();
this.cpuUtilizationHistogram = createCPUUtilizationHistogram();
heapUsedHistogram = createHeapUsedHistogram();
this.threadLocal = ThreadLocal.withInitial(() -> new HashMap<String, Long>());
this.numProcessors = Runtime.getRuntime().availableProcessors();
}

private Histogram createCPUUtilizationHistogram() {
Expand All @@ -63,6 +64,19 @@ private Histogram createCPUUtilizationHistogram() {
}
}

private Histogram createHeapUsedHistogram() {
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.HeapValue.HEAP_USED.toString(),
"Heap used per shard for an operation",
"bytes");
} else {
LOG.debug("MetricsRegistry is null");
return null;
}
}

@Override
public String toString() {
return RTFPerformanceAnalyzerSearchListener.class.getSimpleName();
Expand All @@ -74,7 +88,10 @@ SearchListener getSearchListener() {
}

private boolean isSearchListenerEnabled() {
LOG.debug("Controller enable status {}, CollectorMode value {}", controller.isPerformanceAnalyzerEnabled(), controller.getCollectorsSettingValue());
LOG.debug(
"Controller enable status {}, CollectorMode value {}",
controller.isPerformanceAnalyzerEnabled(),
controller.getCollectorsSettingValue());
return cpuUtilizationHistogram != null
&& controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
Expand Down Expand Up @@ -150,14 +167,14 @@ public void preQueryPhase(SearchContext searchContext) {
@Override
public void queryPhase(SearchContext searchContext, long tookInNanos) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
addCPUResourceTrackingCompletionListener(
addResourceTrackingCompletionListener(
searchContext, queryStartTime, OPERATION_SHARD_QUERY, false);
}

@Override
public void failedQueryPhase(SearchContext searchContext) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
addCPUResourceTrackingCompletionListener(
addResourceTrackingCompletionListener(
searchContext, queryStartTime, OPERATION_SHARD_QUERY, true);
}

Expand All @@ -169,18 +186,18 @@ public void preFetchPhase(SearchContext searchContext) {
@Override
public void fetchPhase(SearchContext searchContext, long tookInNanos) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
addCPUResourceTrackingCompletionListener(
addResourceTrackingCompletionListener(
searchContext, fetchStartTime, OPERATION_SHARD_FETCH, false);
}

@Override
public void failedFetchPhase(SearchContext searchContext) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
addCPUResourceTrackingCompletionListener(
addResourceTrackingCompletionListener(
searchContext, fetchStartTime, OPERATION_SHARD_FETCH, true);
}

private void addCPUResourceTrackingCompletionListener(
private void addResourceTrackingCompletionListener(
SearchContext searchContext, long startTime, String operation, boolean isFailed) {
searchContext
.getTask()
Expand All @@ -201,21 +218,27 @@ protected void innerOnResponse(Task task) {
LOG.debug("Updating the counter for task {}", task.getId());
cpuUtilizationHistogram.record(
Utils.calculateCPUUtilization(
task.getTotalResourceStats().getCpuTimeInNanos(),
scClkTck,
totalTime),
Tags.create()
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.request().shardId().getIndex().getName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext.request().shardId().getIndex().getUUID())
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.request().shardId().getId())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation)
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed));
numProcessors,
totalTime,
task.getTotalResourceStats().getCpuTimeInNanos()),
createTags());
heapUsedHistogram.record(
Math.max(0, task.getTotalResourceStats().getMemoryInBytes()), createTags());
}

private Tags createTags() {
return Tags.create()
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.request().shardId().getIndex().getName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext.request().shardId().getIndex().getUUID())
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.request().shardId().getId())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation)
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class RTFPerformanceAnalyzerTransportChannel implements TransportCh
private boolean primary;

private long threadID;
private int numProcessors;

void set(
TransportChannel original,
Expand All @@ -59,6 +60,7 @@ void set(
this.operationStartTime = System.nanoTime();
threadID = Thread.currentThread().getId();
this.cpuStartTime = threadMXBean.getThreadCpuTime(threadID);
this.numProcessors = Runtime.getRuntime().availableProcessors();
LOG.info("111 - Thread Name {}", Thread.currentThread().getName());
this.scClkTck = OSGlobals.getScClkTck();
}
Expand Down Expand Up @@ -96,7 +98,7 @@ private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTi
long totalCpuTime =
Math.max(0, (threadMXBean.getThreadCpuTime(threadID) - phaseCPUStartTime));
return Utils.calculateCPUUtilization(
totalCpuTime, scClkTck, System.nanoTime() - phaseStartTime);
numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime);
}

@VisibleForTesting
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/org/opensearch/performanceanalyzer/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,15 @@ public static HashMap<ShardId, IndexShard> getShards() {
IndexShardState.POST_RECOVERY,
IndexShardState.STARTED);

public static double calculateCPUUtilization(long totalCpuTime, long scClkTck, long totalTime) {
LOG.debug("totalCpuTime {}", totalCpuTime);
LOG.debug("scClkTck {}", scClkTck);
LOG.debug("totalTime {}", totalTime);
return (1.0e3 * Math.max(0, totalCpuTime) / Math.max(1, scClkTck)) / Math.max(1, totalTime);
public static double calculateCPUUtilization(
int numProcessors, long totalOperationTime, long cpuUsageTime) {
LOG.debug("totalCpuTime {}", cpuUsageTime);
LOG.debug("totalOperationTime {}", totalOperationTime);
LOG.debug("numProcessors {}", numProcessors);
if (totalOperationTime == 0l || cpuUsageTime == 0l || numProcessors == 0) {
return 0.0d;
}
double totalAvailableCPUTime = Double.valueOf(totalOperationTime * numProcessors);
return cpuUsageTime / totalAvailableCPUTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.search.internal.SearchContext;
Expand All @@ -32,9 +31,6 @@
import org.opensearch.telemetry.metrics.tags.Tags;

public class RTFPerformanceAnalyzerSearchListenerTests {
private static final long TOOK_IN_NANOS = 10;
private static final String EXCEPTION =
StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR.toString();

private RTFPerformanceAnalyzerSearchListener searchListener;

Expand All @@ -44,7 +40,8 @@ public class RTFPerformanceAnalyzerSearchListenerTests {
@Mock private PerformanceAnalyzerController controller;
@Mock private SearchShardTask task;
@Mock private MetricsRegistry metricsRegistry;
@Mock private Histogram histogram;
@Mock private Histogram cpuUtilizationHistogram;
@Mock private Histogram heapUsedHistogram;
@Mock private Index index;

@Mock private TaskResourceUsage taskResourceUsage;
Expand All @@ -66,7 +63,11 @@ public void init() {
Mockito.eq("CPU_Utilization"),
Mockito.anyString(),
Mockito.eq("rate")))
.thenReturn(histogram);
.thenReturn(cpuUtilizationHistogram);
Mockito.when(
metricsRegistry.createHistogram(
Mockito.eq("heap_used"), Mockito.anyString(), Mockito.eq("bytes")))
.thenReturn(heapUsedHistogram);
searchListener = new RTFPerformanceAnalyzerSearchListener(controller);
assertEquals(
RTFPerformanceAnalyzerSearchListener.class.getSimpleName(),
Expand Down Expand Up @@ -95,12 +96,6 @@ public void testQueryPhase() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(
metricsRegistry.createHistogram(
Mockito.eq("CPU_Utilization"),
Mockito.anyString(),
Mockito.eq("rate")))
.thenReturn(histogram);
searchListener.preQueryPhase(searchContext);
searchListener.queryPhase(searchContext, 0l);
Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any());
Expand All @@ -111,12 +106,6 @@ public void testQueryPhaseFailed() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(
metricsRegistry.createHistogram(
Mockito.eq("CPU_Utilization"),
Mockito.anyString(),
Mockito.eq("rate")))
.thenReturn(histogram);
searchListener.preQueryPhase(searchContext);
searchListener.failedQueryPhase(searchContext);
Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any());
Expand All @@ -127,12 +116,6 @@ public void testFetchPhase() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(
metricsRegistry.createHistogram(
Mockito.eq("CPU_Utilization"),
Mockito.anyString(),
Mockito.eq("rate")))
.thenReturn(histogram);
searchListener.preFetchPhase(searchContext);
searchListener.fetchPhase(searchContext, 0l);
Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any());
Expand All @@ -143,12 +126,6 @@ public void testFetchPhaseFailed() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
Mockito.when(
metricsRegistry.createHistogram(
Mockito.eq("CPU_Utilization"),
Mockito.anyString(),
Mockito.eq("rate")))
.thenReturn(histogram);
searchListener.preFetchPhase(searchContext);
searchListener.failedFetchPhase(searchContext);
Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any());
Expand All @@ -157,12 +134,6 @@ public void testFetchPhaseFailed() {
@Test
public void testTaskCompletionListener() {
initializeValidSearchContext(true);
Mockito.when(
metricsRegistry.createHistogram(
Mockito.eq("CPU_Utilization"),
Mockito.anyString(),
Mockito.eq("rate")))
.thenReturn(histogram);
RTFPerformanceAnalyzerSearchListener rtfSearchListener =
new RTFPerformanceAnalyzerSearchListener(controller);

Expand All @@ -175,7 +146,9 @@ public void testTaskCompletionListener() {
NotifyOnceListener<Task> taskCompletionListener =
rtfSearchListener.createListener(searchContext, 0l, "test", false);
taskCompletionListener.onResponse(task);
Mockito.verify(histogram).record(Mockito.anyDouble(), Mockito.any(Tags.class));
Mockito.verify(cpuUtilizationHistogram)
.record(Mockito.anyDouble(), Mockito.any(Tags.class));
Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class));
}

private void initializeValidSearchContext(boolean isValid) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.util;

import org.junit.Assert;
import org.junit.Test;

public class UtilsTests {

@Test
public void testCPUUtilization() {
Assert.assertEquals(0.5, Utils.calculateCPUUtilization(2, 5, 5), 0.0);
Assert.assertEquals(1.0, Utils.calculateCPUUtilization(1, 5, 5), 0.0);
Assert.assertEquals(
Double.valueOf(10 / 15.0), Utils.calculateCPUUtilization(3, 5, 10), 0.0);
}

@Test
public void testCPUUtilizationZeroValue() {
Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 5, 0), 0.0);
Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 0, 5), 0.0);
Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5), 0.0);
}
}

0 comments on commit 7b2e1e8

Please sign in to comment.