From aad2bfd3476253e4db85acd97a8142c68f1d1dcd Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Mon, 22 Jul 2024 19:46:21 +0530 Subject: [PATCH] Fixes the same task issue Signed-off-by: Gagan Juneja --- .../RTFPerformanceAnalyzerSearchListener.java | 81 ++++++++++++++++--- ...TFPerformanceAnalyzerTransportChannel.java | 2 +- .../performanceanalyzer/util/Utils.java | 7 +- ...erformanceAnalyzerSearchListenerTests.java | 14 +++- .../performanceanalyzer/util/UtilsTests.java | 17 ++-- 5 files changed, 99 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 059b8750..ea326b81 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -40,6 +40,8 @@ public class RTFPerformanceAnalyzerSearchListener private static final String OPERATION_SHARD_QUERY = "shard_query"; public static final String QUERY_START_TIME = "query_start_time"; public static final String FETCH_START_TIME = "fetch_start_time"; + public static final String QUERY_TIME = "query_time"; + public static final String QUERY_TASK_ID = "query_task_id"; private final ThreadLocal> threadLocal; private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener(); @@ -73,7 +75,7 @@ private Histogram createHeapUsedHistogram() { MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); if (metricsRegistry != null) { return metricsRegistry.createHistogram( - RTFMetrics.HeapValue.HEAP_USED.toString(), + RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(), "Heap used per shard for an operation", RTFMetrics.MetricUnits.BYTE.toString()); } else { @@ -167,20 +169,24 @@ public void onFailedFetchPhase(SearchContext searchContext) { @Override public void preQueryPhase(SearchContext searchContext) { threadLocal.get().put(QUERY_START_TIME, System.nanoTime()); + threadLocal.get().put(QUERY_TASK_ID, searchContext.getTask().getId()); } @Override public void queryPhase(SearchContext searchContext, long tookInNanos) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + long queryTime = (System.nanoTime() - queryStartTime); + threadLocal.get().put(QUERY_TIME, queryTime); addResourceTrackingCompletionListener( - searchContext, queryStartTime, OPERATION_SHARD_QUERY, false); + searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, false); } @Override public void failedQueryPhase(SearchContext searchContext) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l); + long queryTime = (System.nanoTime() - queryStartTime); addResourceTrackingCompletionListener( - searchContext, queryStartTime, OPERATION_SHARD_QUERY, true); + searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, true); } @Override @@ -191,44 +197,92 @@ public void preFetchPhase(SearchContext searchContext) { @Override public void fetchPhase(SearchContext searchContext, long tookInNanos) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); - addResourceTrackingCompletionListener( + addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, OPERATION_SHARD_FETCH, false); } @Override public void failedFetchPhase(SearchContext searchContext) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l); - addResourceTrackingCompletionListener( + addResourceTrackingCompletionListenerForFetchPhase( searchContext, fetchStartTime, OPERATION_SHARD_FETCH, true); } private void addResourceTrackingCompletionListener( - SearchContext searchContext, long startTime, String operation, boolean isFailed) { + SearchContext searchContext, + long startTime, + long queryTime, + String operation, + boolean isFailed) { + addCompletionListener(searchContext, startTime, queryTime, operation, isFailed); + } + + private void addResourceTrackingCompletionListenerForFetchPhase( + SearchContext searchContext, long fetchStartTime, String operation, boolean isFailed) { + long startTime = fetchStartTime; + long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l); + /** + * There are scenarios where both query and fetch pahses run in the same task for an + * optimization. Adding a special handling for that case to divide the CPU usage between + * these 2 operations by their runTime. + */ + if (queryTaskId == searchContext.getTask().getId()) { + startTime = threadLocal.get().getOrDefault(QUERY_TIME, 0l); + } + long fetchTime = System.nanoTime() - fetchStartTime; + addCompletionListener(searchContext, startTime, fetchTime, operation, isFailed); + } + + private void addCompletionListener( + SearchContext searchContext, + long overallStartTime, + long operationTime, + String operation, + boolean isFailed) { searchContext .getTask() .addResourceTrackingCompletionListener( createListener( searchContext, - (System.nanoTime() - startTime), + overallStartTime, + operationTime, operation, isFailed)); } @VisibleForTesting NotifyOnceListener createListener( - SearchContext searchContext, long totalTime, String operation, boolean isFailed) { + SearchContext searchContext, + long overallStartTime, + long totalOperationTime, + String operation, + boolean isFailed) { return new NotifyOnceListener() { @Override protected void innerOnResponse(Task task) { LOG.debug("Updating the counter for task {}", task.getId()); + /** + * There are scenarios where cpuUsageTime consists of the total of CPU of multiple + * operations. In that case we are computing the cpuShareFactor by dividing the + * particular operationTime and the total time till this calculation happen from the + * overall start time. + */ + double operationShareFactor = + computeShareFactor( + totalOperationTime, System.nanoTime() - overallStartTime); cpuUtilizationHistogram.record( Utils.calculateCPUUtilization( numProcessors, - totalTime, - task.getTotalResourceStats().getCpuTimeInNanos()), + totalOperationTime, + task.getTotalResourceStats().getCpuTimeInNanos(), + operationShareFactor), createTags()); heapUsedHistogram.record( - Math.max(0, task.getTotalResourceStats().getMemoryInBytes()), createTags()); + Math.max( + 0, + task.getTotalResourceStats().getMemoryInBytes() + * operationShareFactor), + createTags()); } private Tags createTags() { @@ -252,4 +306,9 @@ protected void innerOnFailure(Exception e) { } }; } + + @VisibleForTesting + static double computeShareFactor(long totalOperationTime, long totalTime) { + return Math.min(1, ((double) totalOperationTime) / Math.max(1.0, totalTime)); + } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java index 40465497..3cc4b353 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -98,7 +98,7 @@ private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTi long totalCpuTime = Math.max(0, (threadMXBean.getThreadCpuTime(threadID) - phaseCPUStartTime)); return Utils.calculateCPUUtilization( - numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime); + numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime, 1.0); } @VisibleForTesting diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index aca1b295..946a52bd 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -113,14 +113,15 @@ public static HashMap getShards() { IndexShardState.STARTED); public static double calculateCPUUtilization( - int numProcessors, long totalOperationTime, long cpuUsageTime) { + int numProcessors, long totalOperationTime, long cpuUsageTime, double cpuShareFactor) { + LOG.debug("numProcessors {}", numProcessors); + LOG.debug("cpuShareFactor {}", cpuShareFactor); LOG.debug("totalCpuTime {}", cpuUsageTime); LOG.debug("totalOperationTime {}", totalOperationTime); - LOG.debug("numProcessors {}", numProcessors); if (totalOperationTime == 0l || cpuUsageTime == 0l || numProcessors == 0) { return 0.0d; } double totalAvailableCPUTime = Double.valueOf(totalOperationTime * numProcessors); - return cpuUsageTime / totalAvailableCPUTime; + return cpuShareFactor * (cpuUsageTime / totalAvailableCPUTime); } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index 208b47c6..9629abb2 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -131,6 +131,18 @@ public void testFetchPhaseFailed() { Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); } + @Test + public void testOperationShareFactor() { + assertEquals( + Double.valueOf(10.0 / 15), + RTFPerformanceAnalyzerSearchListener.computeShareFactor(10, 15), + 0); + assertEquals( + Double.valueOf(1), + RTFPerformanceAnalyzerSearchListener.computeShareFactor(15, 10), + 0); + } + @Test public void testTaskCompletionListener() { initializeValidSearchContext(true); @@ -144,7 +156,7 @@ public void testTaskCompletionListener() { Mockito.when(taskResourceUsage.getCpuTimeInNanos()).thenReturn(10l); NotifyOnceListener taskCompletionListener = - rtfSearchListener.createListener(searchContext, 0l, "test", false); + rtfSearchListener.createListener(searchContext, 0l, 0l, "test", false); taskCompletionListener.onResponse(task); Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); diff --git a/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java b/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java index 2ee91987..b1e2490e 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/util/UtilsTests.java @@ -12,16 +12,21 @@ public class UtilsTests { @Test public void testCPUUtilization() { - Assert.assertEquals(0.5, Utils.calculateCPUUtilization(2, 5, 5), 0.0); - Assert.assertEquals(1.0, Utils.calculateCPUUtilization(1, 5, 5), 0.0); + Assert.assertEquals(0.5, Utils.calculateCPUUtilization(2, 5, 5, 1.0), 0.0); + Assert.assertEquals(1.0, Utils.calculateCPUUtilization(1, 5, 5, 1.0), 0.0); Assert.assertEquals( - Double.valueOf(10 / 15.0), Utils.calculateCPUUtilization(3, 5, 10), 0.0); + Double.valueOf(10 / 15.0), Utils.calculateCPUUtilization(3, 5, 10, 1.0), 0.0); + Assert.assertEquals( + Double.valueOf(0.50 * (20 / 30.0)), + Utils.calculateCPUUtilization(3, 10, 20, 0.5d), + 0.0); } @Test public void testCPUUtilizationZeroValue() { - Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 5, 0), 0.0); - Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 0, 5), 0.0); - Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 5, 0, 1.0), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 0, 5, 1.0), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5, 1.0), 0.0); + Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5, 0.0), 0.0); } }