Skip to content

Commit

Permalink
Addresses the review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Jul 23, 2024
1 parent 6899623 commit 34ebcc4
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ public boolean isCollectorDisabled(
*
* @return collectorsSettingValue
*/
public int getCollectorsSettingValue() {
public int getCollectorsRunModeValue() {
return collectorsSettingValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ SearchListener getSearchListener() {

private boolean isSearchListenerEnabled() {
return controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
&& (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsRunModeValue()
== Util.CollectorMode.RCA.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ public class RTFPerformanceAnalyzerSearchListener

public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) {
this.controller = controller;
this.cpuUtilizationHistogram = createCPUUtilizationHistogram();
heapUsedHistogram = createHeapUsedHistogram();
this.cpuUtilizationHistogram =
createCPUUtilizationHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.heapUsedHistogram =
createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry());
this.threadLocal = ThreadLocal.withInitial(() -> new HashMap<String, Long>());
this.numProcessors = Runtime.getRuntime().availableProcessors();
}

private Histogram createCPUUtilizationHistogram() {
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
private Histogram createCPUUtilizationHistogram(MetricsRegistry metricsRegistry) {
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(),
Expand All @@ -70,8 +71,7 @@ private Histogram createCPUUtilizationHistogram() {
}
}

private Histogram createHeapUsedHistogram() {
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) {
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(),
Expand All @@ -94,14 +94,10 @@ SearchListener getSearchListener() {
}

private boolean isSearchListenerEnabled() {
LOG.debug(
"Controller enable status {}, CollectorMode value {}",
controller.isPerformanceAnalyzerEnabled(),
controller.getCollectorsSettingValue());
return OpenSearchResources.INSTANCE.getMetricsRegistry() != null
&& controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
&& (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsRunModeValue()
== Util.CollectorMode.TELEMETRY.getValue());
}

Expand Down Expand Up @@ -173,15 +169,15 @@ public void preQueryPhase(SearchContext searchContext) {

@Override
public void queryPhase(SearchContext searchContext, long tookInNanos) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime());
long queryTime = (System.nanoTime() - queryStartTime);
addResourceTrackingCompletionListener(
searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, false);
}

@Override
public void failedQueryPhase(SearchContext searchContext) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime());
long queryTime = (System.nanoTime() - queryStartTime);
addResourceTrackingCompletionListener(
searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, true);
Expand All @@ -194,15 +190,15 @@ public void preFetchPhase(SearchContext searchContext) {

@Override
public void fetchPhase(SearchContext searchContext, long tookInNanos) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime());
long fetchTime = (System.nanoTime() - fetchStartTime);
addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, false);
}

@Override
public void failedFetchPhase(SearchContext searchContext) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime());
long fetchTime = (System.nanoTime() - fetchStartTime);
addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, fetchTime, OPERATION_SHARD_FETCH, true);
Expand All @@ -226,12 +222,12 @@ private void addResourceTrackingCompletionListenerForFetchPhase(
long overallStartTime = fetchStartTime;
long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l);
/**
* There are scenarios where both query and fetch pahses run in the same task for an
* There are scenarios where both query and fetch phases run in the same task for an
* optimization. Adding a special handling for that case to divide the CPU usage between
* these 2 operations by their runTime.
*/
if (queryTaskId == searchContext.getTask().getId()) {
overallStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
overallStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime());
}
addCompletionListener(searchContext, overallStartTime, fetchTime, operation, isFailed);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ TransportChannel getChannel(T request, TransportChannel channel, Task task) {

private boolean isCollectorEnabled() {
return controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
&& (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsRunModeValue()
== Util.CollectorMode.RCA.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public final class RTFPerformanceAnalyzerTransportRequestHandler<T extends Trans
LogManager.getLogger(RTFPerformanceAnalyzerTransportRequestHandler.class);
private final PerformanceAnalyzerController controller;
private TransportRequestHandler<T> actualHandler;
boolean logOnce = false;
private boolean logOnce = false;
private final Histogram cpuUtilizationHistogram;

RTFPerformanceAnalyzerTransportRequestHandler(
Expand All @@ -52,7 +52,7 @@ private Histogram createCPUUtilizationHistogram() {
return metricsRegistry.createHistogram(
RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(),
"CPU Utilization per shard for an operation",
"rate");
RTFMetrics.MetricUnits.RATE.toString());
} else {
return null;
}
Expand All @@ -79,8 +79,8 @@ TransportChannel getChannel(T request, TransportChannel channel, Task task) {
private boolean isCollectorEnabled() {
return OpenSearchResources.INSTANCE.getMetricsRegistry() != null
&& controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
&& (controller.getCollectorsRunModeValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsRunModeValue()
== Util.CollectorMode.TELEMETRY.getValue());
}

Expand Down
29 changes: 24 additions & 5 deletions src/main/java/org/opensearch/performanceanalyzer/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,37 @@ public static HashMap<ShardId, IndexShard> getShards() {
IndexShardState.POST_RECOVERY,
IndexShardState.STARTED);

/**
* CPU Utilization is the time spend in CPU cycles divide by the total time cpu available time.
* Total cpu available time would be the multiplication of num of processors and the process
* time. It also takes into account the cpuShareFactor in case some adjustments are needed.
*
* @param numProcessors
* @param totalOperationTime
* @param cpuUsageTime
* @param cpuShareFactor
* @return
*/
public static double calculateCPUUtilization(
int numProcessors, long totalOperationTime, long cpuUsageTime, double cpuShareFactor) {
LOG.debug("CPUUtilization calculation - numProcessors {}", numProcessors);
LOG.debug("CPUUtilization calculation - cpuShareFactor {}", cpuShareFactor);
LOG.debug("CPUUtilization calculation - totalCpuTime {}", cpuUsageTime);
LOG.debug("CPUUtilization calculation - totalOperationTime {}", totalOperationTime);
LOG.debug(
"Performance Analyzer CPUUtilization calculation with numProcessors: {}",
numProcessors);
LOG.debug(
"Performance Analyzer CPUUtilization calculation with cpuShareFactor {}",
cpuShareFactor);
LOG.debug(
"Performance Analyzer CPUUtilization calculation with totalCpuTime {}",
cpuUsageTime);
LOG.debug(
"Performance Analyzer CPUUtilization calculation with totalOperationTime {}",
totalOperationTime);
if (totalOperationTime == 0l || cpuUsageTime == 0l || numProcessors == 0) {
return 0.0d;
}
double totalAvailableCPUTime = Double.valueOf(totalOperationTime * numProcessors);
double cpuUtil = cpuShareFactor * (cpuUsageTime / totalAvailableCPUTime);
LOG.debug("CPUUtilization calculation - cpuUtil {}", cpuUtil);
LOG.debug("Performance Analyzer CPUUtilization calculation with cpuUtil {}", cpuUtil);
return cpuUtil;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ public void init() {

@Test
public void tesSearchListener() {
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener);

Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.RCA.getValue());
assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener);

Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.DUAL.getValue());
assertTrue(searchListener.getSearchListener() instanceof PerformanceAnalyzerSearchListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ public void init() {

@Test
public void tesSearchListener() {
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.RCA.getValue());
assertTrue(searchListener.getSearchListener() instanceof NoOpSearchListener);

Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
assertTrue(
searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener);

Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.DUAL.getValue());
assertTrue(
searchListener.getSearchListener() instanceof RTFPerformanceAnalyzerSearchListener);
Expand All @@ -94,7 +94,7 @@ public void tesSearchListener() {
@Test
public void testQueryPhase() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
searchListener.preQueryPhase(searchContext);
searchListener.queryPhase(searchContext, 0l);
Expand All @@ -104,7 +104,7 @@ public void testQueryPhase() {
@Test
public void testQueryPhaseFailed() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
searchListener.preQueryPhase(searchContext);
searchListener.failedQueryPhase(searchContext);
Expand All @@ -114,7 +114,7 @@ public void testQueryPhaseFailed() {
@Test
public void testFetchPhase() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
searchListener.preFetchPhase(searchContext);
searchListener.fetchPhase(searchContext, 0l);
Expand All @@ -124,7 +124,7 @@ public void testFetchPhase() {
@Test
public void testFetchPhaseFailed() {
initializeValidSearchContext(true);
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
searchListener.preFetchPhase(searchContext);
searchListener.failedFetchPhase(searchContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testGetChannel() {

@Test
public void testGetChannelIfRCAModeIsDisabled() {
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1);
handler.getChannel(concreteShardRequest, channel, task);
Expand All @@ -85,7 +85,7 @@ public void testGetChannelIfRCAModeIsDisabled() {

@Test
public void testGetChannelIfDualModeIsEnabled() {
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.DUAL.getValue());
concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1);
handler.getChannel(concreteShardRequest, channel, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testMessageReceived() throws Exception {
@Test
public void testGetChannel() {
OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry);
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.TELEMETRY.getValue());
concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1);
handler.getChannel(concreteShardRequest, channel, task);
Expand All @@ -79,7 +79,7 @@ public void testGetChannel() {
@Test
public void testGetChannelTelemetryIsDisabled() {
OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry);
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.RCA.getValue());
concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1);
handler.getChannel(concreteShardRequest, channel, task);
Expand All @@ -93,7 +93,7 @@ public void testGetChannelTelemetryIsDisabled() {
@Test
public void testGetChannelDualMode() {
OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry);
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.DUAL.getValue());
concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1);
handler.getChannel(concreteShardRequest, channel, task);
Expand All @@ -107,7 +107,7 @@ public void testGetChannelDualMode() {
@Test
public void testGetChannelMetricRegistryIsNull() {
OpenSearchResources.INSTANCE.setMetricsRegistry(null);
Mockito.when(controller.getCollectorsSettingValue())
Mockito.when(controller.getCollectorsRunModeValue())
.thenReturn(Util.CollectorMode.RCA.getValue());
concreteShardRequest = new ConcreteShardRequest(bulkShardRequest, "id", 1);
handler.getChannel(concreteShardRequest, channel, task);
Expand Down

0 comments on commit 34ebcc4

Please sign in to comment.