Skip to content

Commit

Permalink
Fixes the same task issue
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Jul 22, 2024
1 parent d7d5660 commit aad2bfd
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class RTFPerformanceAnalyzerSearchListener
private static final String OPERATION_SHARD_QUERY = "shard_query";
public static final String QUERY_START_TIME = "query_start_time";
public static final String FETCH_START_TIME = "fetch_start_time";
public static final String QUERY_TIME = "query_time";
public static final String QUERY_TASK_ID = "query_task_id";
private final ThreadLocal<Map<String, Long>> threadLocal;
private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener();

Expand Down Expand Up @@ -73,7 +75,7 @@ private Histogram createHeapUsedHistogram() {
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.HeapValue.HEAP_USED.toString(),
RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(),
"Heap used per shard for an operation",
RTFMetrics.MetricUnits.BYTE.toString());
} else {
Expand Down Expand Up @@ -167,20 +169,24 @@ public void onFailedFetchPhase(SearchContext searchContext) {
@Override
public void preQueryPhase(SearchContext searchContext) {
threadLocal.get().put(QUERY_START_TIME, System.nanoTime());
threadLocal.get().put(QUERY_TASK_ID, searchContext.getTask().getId());
}

@Override
public void queryPhase(SearchContext searchContext, long tookInNanos) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
long queryTime = (System.nanoTime() - queryStartTime);
threadLocal.get().put(QUERY_TIME, queryTime);
addResourceTrackingCompletionListener(
searchContext, queryStartTime, OPERATION_SHARD_QUERY, false);
searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, false);
}

@Override
public void failedQueryPhase(SearchContext searchContext) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
long queryTime = (System.nanoTime() - queryStartTime);
addResourceTrackingCompletionListener(
searchContext, queryStartTime, OPERATION_SHARD_QUERY, true);
searchContext, queryStartTime, queryTime, OPERATION_SHARD_QUERY, true);
}

@Override
Expand All @@ -191,44 +197,92 @@ public void preFetchPhase(SearchContext searchContext) {
@Override
public void fetchPhase(SearchContext searchContext, long tookInNanos) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
addResourceTrackingCompletionListener(
addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, OPERATION_SHARD_FETCH, false);
}

@Override
public void failedFetchPhase(SearchContext searchContext) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
addResourceTrackingCompletionListener(
addResourceTrackingCompletionListenerForFetchPhase(
searchContext, fetchStartTime, OPERATION_SHARD_FETCH, true);
}

private void addResourceTrackingCompletionListener(
SearchContext searchContext, long startTime, String operation, boolean isFailed) {
SearchContext searchContext,
long startTime,
long queryTime,
String operation,
boolean isFailed) {
addCompletionListener(searchContext, startTime, queryTime, operation, isFailed);
}

private void addResourceTrackingCompletionListenerForFetchPhase(
SearchContext searchContext, long fetchStartTime, String operation, boolean isFailed) {
long startTime = fetchStartTime;
long queryTaskId = threadLocal.get().getOrDefault(QUERY_TASK_ID, 0l);
/**
* There are scenarios where both query and fetch pahses run in the same task for an
* optimization. Adding a special handling for that case to divide the CPU usage between
* these 2 operations by their runTime.
*/
if (queryTaskId == searchContext.getTask().getId()) {
startTime = threadLocal.get().getOrDefault(QUERY_TIME, 0l);
}
long fetchTime = System.nanoTime() - fetchStartTime;
addCompletionListener(searchContext, startTime, fetchTime, operation, isFailed);
}

private void addCompletionListener(
SearchContext searchContext,
long overallStartTime,
long operationTime,
String operation,
boolean isFailed) {
searchContext
.getTask()
.addResourceTrackingCompletionListener(
createListener(
searchContext,
(System.nanoTime() - startTime),
overallStartTime,
operationTime,
operation,
isFailed));
}

@VisibleForTesting
NotifyOnceListener<Task> createListener(
SearchContext searchContext, long totalTime, String operation, boolean isFailed) {
SearchContext searchContext,
long overallStartTime,
long totalOperationTime,
String operation,
boolean isFailed) {
return new NotifyOnceListener<Task>() {
@Override
protected void innerOnResponse(Task task) {
LOG.debug("Updating the counter for task {}", task.getId());
/**
* There are scenarios where cpuUsageTime consists of the total of CPU of multiple
* operations. In that case we are computing the cpuShareFactor by dividing the
* particular operationTime and the total time till this calculation happen from the
* overall start time.
*/
double operationShareFactor =
computeShareFactor(
totalOperationTime, System.nanoTime() - overallStartTime);
cpuUtilizationHistogram.record(
Utils.calculateCPUUtilization(
numProcessors,
totalTime,
task.getTotalResourceStats().getCpuTimeInNanos()),
totalOperationTime,
task.getTotalResourceStats().getCpuTimeInNanos(),
operationShareFactor),
createTags());
heapUsedHistogram.record(
Math.max(0, task.getTotalResourceStats().getMemoryInBytes()), createTags());
Math.max(
0,
task.getTotalResourceStats().getMemoryInBytes()
* operationShareFactor),
createTags());
}

private Tags createTags() {
Expand All @@ -252,4 +306,9 @@ protected void innerOnFailure(Exception e) {
}
};
}

@VisibleForTesting
static double computeShareFactor(long totalOperationTime, long totalTime) {
return Math.min(1, ((double) totalOperationTime) / Math.max(1.0, totalTime));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTi
long totalCpuTime =
Math.max(0, (threadMXBean.getThreadCpuTime(threadID) - phaseCPUStartTime));
return Utils.calculateCPUUtilization(
numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime);
numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime, 1.0);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,15 @@ public static HashMap<ShardId, IndexShard> getShards() {
IndexShardState.STARTED);

public static double calculateCPUUtilization(
int numProcessors, long totalOperationTime, long cpuUsageTime) {
int numProcessors, long totalOperationTime, long cpuUsageTime, double cpuShareFactor) {
LOG.debug("numProcessors {}", numProcessors);
LOG.debug("cpuShareFactor {}", cpuShareFactor);
LOG.debug("totalCpuTime {}", cpuUsageTime);
LOG.debug("totalOperationTime {}", totalOperationTime);
LOG.debug("numProcessors {}", numProcessors);
if (totalOperationTime == 0l || cpuUsageTime == 0l || numProcessors == 0) {
return 0.0d;
}
double totalAvailableCPUTime = Double.valueOf(totalOperationTime * numProcessors);
return cpuUsageTime / totalAvailableCPUTime;
return cpuShareFactor * (cpuUsageTime / totalAvailableCPUTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@ public void testFetchPhaseFailed() {
Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any());
}

@Test
public void testOperationShareFactor() {
assertEquals(
Double.valueOf(10.0 / 15),
RTFPerformanceAnalyzerSearchListener.computeShareFactor(10, 15),
0);
assertEquals(
Double.valueOf(1),
RTFPerformanceAnalyzerSearchListener.computeShareFactor(15, 10),
0);
}

@Test
public void testTaskCompletionListener() {
initializeValidSearchContext(true);
Expand All @@ -144,7 +156,7 @@ public void testTaskCompletionListener() {
Mockito.when(taskResourceUsage.getCpuTimeInNanos()).thenReturn(10l);

NotifyOnceListener<Task> taskCompletionListener =
rtfSearchListener.createListener(searchContext, 0l, "test", false);
rtfSearchListener.createListener(searchContext, 0l, 0l, "test", false);
taskCompletionListener.onResponse(task);
Mockito.verify(cpuUtilizationHistogram)
.record(Mockito.anyDouble(), Mockito.any(Tags.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@ public class UtilsTests {

@Test
public void testCPUUtilization() {
Assert.assertEquals(0.5, Utils.calculateCPUUtilization(2, 5, 5), 0.0);
Assert.assertEquals(1.0, Utils.calculateCPUUtilization(1, 5, 5), 0.0);
Assert.assertEquals(0.5, Utils.calculateCPUUtilization(2, 5, 5, 1.0), 0.0);
Assert.assertEquals(1.0, Utils.calculateCPUUtilization(1, 5, 5, 1.0), 0.0);
Assert.assertEquals(
Double.valueOf(10 / 15.0), Utils.calculateCPUUtilization(3, 5, 10), 0.0);
Double.valueOf(10 / 15.0), Utils.calculateCPUUtilization(3, 5, 10, 1.0), 0.0);
Assert.assertEquals(
Double.valueOf(0.50 * (20 / 30.0)),
Utils.calculateCPUUtilization(3, 10, 20, 0.5d),
0.0);
}

@Test
public void testCPUUtilizationZeroValue() {
Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 5, 0), 0.0);
Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 0, 5), 0.0);
Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5), 0.0);
Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 5, 0, 1.0), 0.0);
Assert.assertEquals(0.0, Utils.calculateCPUUtilization(2, 0, 5, 1.0), 0.0);
Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5, 1.0), 0.0);
Assert.assertEquals(0.0, Utils.calculateCPUUtilization(0, 5, 5, 0.0), 0.0);
}
}

0 comments on commit aad2bfd

Please sign in to comment.