Skip to content

Commit

Permalink
add additional test case to test the circuit breaker for SBP logic
Browse files Browse the repository at this point in the history
Signed-off-by: Kaushal Kumar <[email protected]>
  • Loading branch information
kaushalmahi12 committed Jun 19, 2024
1 parent b8dba28 commit 0c7043d
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,12 @@ void doRun() {
List<CancellableTask> searchTasks = getTaskByType(SearchTask.class);
List<CancellableTask> searchShardTasks = getTaskByType(SearchShardTask.class);

boolean isHeapUsageDominatedBySearchTasks = HeapUsageTracker.isHeapUsageDominatedBySearch(
boolean isHeapUsageDominatedBySearchTasks = isHeapUsageDominatedBySearch(
searchTasks,
getSettings().getSearchTaskSettings().getTotalHeapPercentThreshold()
);
boolean isHeapUsageDominatedBySearchShardTasks = HeapUsageTracker.isHeapUsageDominatedBySearch(
searchTasks,
boolean isHeapUsageDominatedBySearchShardTasks = isHeapUsageDominatedBySearch(
searchShardTasks,
getSettings().getSearchShardTaskSettings().getTotalHeapPercentThreshold()
);
final Map<Class<? extends SearchBackpressureTask>, List<CancellableTask>> cancellableTasks = Map.of(
Expand All @@ -219,6 +219,7 @@ void doRun() {
// Since these cancellations might be duplicate due to multiple trackers causing cancellation for same task
// We need to merge them
taskCancellations = mergeTaskCancellations(taskCancellations).stream()
.map(this::addSBPStateUpdateCallback)
.filter(TaskCancellation::isEligibleForCancellation)
.collect(Collectors.toList());

Expand Down Expand Up @@ -252,6 +253,28 @@ void doRun() {
}
}

/**
* Had to define this method to help mock this static method to test the scenario where SearchTraffic should not be
* penalised when not breaching the threshold
* @param searchTasks
* @param threshold
* @return
*/
boolean isHeapUsageDominatedBySearch(List<CancellableTask> searchTasks, double threshold) {
return HeapUsageTracker.isHeapUsageDominatedBySearch(searchTasks, threshold);
}

private TaskCancellation addSBPStateUpdateCallback(TaskCancellation taskCancellation) {
CancellableTask task = taskCancellation.getTask();
Runnable toAddCancellationCallbackForSBPState = searchBackpressureStates.get(SearchShardTask.class)::incrementCancellationCount;
if (task instanceof SearchTask) {
toAddCancellationCallbackForSBPState = searchBackpressureStates.get(SearchTask.class)::incrementCancellationCount;
}
List<Runnable> newOnCancelCallbacks = new ArrayList<>(taskCancellation.getOnCancelCallbacks());
newOnCancelCallbacks.add(toAddCancellationCallbackForSBPState);
return new TaskCancellation(task, taskCancellation.getReasons(), newOnCancelCallbacks);
}

private boolean shouldApply(TaskResourceUsageTrackerType trackerType) {
return trackerApplyConditions.get(trackerType).apply(nodeDuressTrackers);
}
Expand All @@ -267,27 +290,13 @@ private List<TaskCancellation> addResourceTrackerBasedCancellations(
final Class<? extends SearchBackpressureTask> taskType = taskResourceUsageTrackers.getKey();

taskResourceUsageTracker.ifPresent(
tracker -> taskCancellations.addAll(
tracker.getTaskCancellations(
cancellableTasks.get(taskType),
searchBackpressureStates.get(taskType)::incrementCancellationCount
)
)
tracker -> taskCancellations.addAll(tracker.getTaskCancellations(cancellableTasks.get(taskType)))
);
}

return taskCancellations;
}

/**
* returns the taskTrackers for given type
* @param type
* @return
*/
private TaskResourceUsageTrackers getTaskResourceUsageTrackersByType(Class<? extends SearchBackpressureTask> type) {
return taskTrackers.get(type);
}

/**
* Method to reduce the taskCancellations into unique bunch
* @param taskCancellations
Expand Down Expand Up @@ -400,7 +409,7 @@ public void onTaskCompleted(Task task) {
}

List<Exception> exceptions = new ArrayList<>();
TaskResourceUsageTrackers trackers = getTaskResourceUsageTrackersByType(taskType);
TaskResourceUsageTrackers trackers = taskTrackers.get(taskType);
for (TaskResourceUsageTracker tracker : trackers.all()) {
try {
tracker.update(task);
Expand Down Expand Up @@ -440,7 +449,8 @@ public SearchBackpressureStats nodeStats() {
searchBackpressureStates.get(SearchTask.class).getCancellationCount(),
searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(),
searchBackpressureStates.get(SearchTask.class).getCompletionCount(),
getTaskResourceUsageTrackersByType(SearchTask.class).all()
taskTrackers.get(SearchTask.class)
.all()
.stream()
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchTasks)))
);
Expand All @@ -449,7 +459,8 @@ public SearchBackpressureStats nodeStats() {
searchBackpressureStates.get(SearchShardTask.class).getCancellationCount(),
searchBackpressureStates.get(SearchShardTask.class).getLimitReachedCount(),
searchBackpressureStates.get(SearchShardTask.class).getCompletionCount(),
getTaskResourceUsageTrackersByType(SearchShardTask.class).all()
taskTrackers.get(SearchShardTask.class)
.all()
.stream()
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks)))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,11 @@ public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task
/**
* Method to get taskCancellations due to this tracker for the given {@link CancellableTask} tasks
* @param tasks
* @param cancellationCallback
* @return
*/
public List<TaskCancellation> getTaskCancellations(List<CancellableTask> tasks, Runnable cancellationCallback) {
public List<TaskCancellation> getTaskCancellations(List<CancellableTask> tasks) {
return tasks.stream()
.map(task -> this.getTaskCancellation(task, List.of(cancellationCallback, this::incrementCancellations)))
.map(task -> this.getTaskCancellation(task, List.of(this::incrementCancellations)))
.filter(TaskCancellation::isEligibleForCancellation)
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyDouble;
import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -485,6 +487,87 @@ public void testNonCancellationOfHeapBasedTasksWhenHeapNotInDuress() {
assertEquals(expectedStats, actualStats);
}

public void testNonCancellationWhenSearchTrafficIsNotQualifyingForCancellation() {
TaskManager mockTaskManager = spy(taskManager);
TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class);
AtomicLong mockTime = new AtomicLong(0);
LongSupplier mockTimeNanosSupplier = mockTime::get;

EnumMap<ResourceType, NodeDuressTracker> duressTrackers = new EnumMap<>(ResourceType.class) {
{
put(JVM, new NodeDuressTracker(() -> false, () -> 3));
put(CPU, new NodeDuressTracker(() -> true, () -> 3));
}
};

NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers);

// Creating heap and cpu usage trackers where heap tracker will always evaluate with reasons to cancel the
// tasks but heap based cancellation should not happen because heap is not in duress
TaskResourceUsageTracker heapUsageTracker = getMockedTaskResourceUsageTracker(
TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER,
(task) -> Optional.of(new TaskCancellation.Reason("mem exceeded", 10))
);
TaskResourceUsageTracker cpuUsageTracker = getMockedTaskResourceUsageTracker(
TaskResourceUsageTrackerType.CPU_USAGE_TRACKER,
(task) -> {
if (task.getTotalResourceStats().getCpuTimeInNanos() < 400) {
return Optional.empty();
}
return Optional.of(new TaskCancellation.Reason("cpu time limit exceeded", 5));
}
);

TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers();
taskResourceUsageTrackers.addTracker(cpuUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER);
taskResourceUsageTrackers.addTracker(heapUsageTracker, TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER);

// Mocking 'settings' with predictable rate limiting thresholds.
SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 10.0);

SearchBackpressureService service = spy(
new SearchBackpressureService(
settings,
mockTaskResourceTrackingService,
threadPool,
mockTimeNanosSupplier,
nodeDuressTrackers,
taskResourceUsageTrackers,
new TaskResourceUsageTrackers(),
mockTaskManager
)
);

when(service.isHeapUsageDominatedBySearch(anyList(), anyDouble())).thenReturn(false);

service.doRun();
service.doRun();

SearchTaskSettings searchTaskSettings = mock(SearchTaskSettings.class);
// setting the total heap percent threshold to minimum so that circuit does not break in SearchBackpressureService
when(searchTaskSettings.getTotalHeapPercentThreshold()).thenReturn(0.0);
when(settings.getSearchTaskSettings()).thenReturn(searchTaskSettings);

// Create a mix of low and high resource usage tasks (60 low + 15 high resource usage tasks).
Map<Long, Task> activeSearchTasks = new HashMap<>();
for (long i = 0; i < 75; i++) {
Class<? extends CancellableTask> taskType = randomBoolean() ? SearchTask.class : SearchShardTask.class;
if (i % 5 == 0) {
activeSearchTasks.put(i, createMockTaskWithResourceStats(taskType, 500, 800, i));
} else {
activeSearchTasks.put(i, createMockTaskWithResourceStats(taskType, 100, 800, i));
}
}
doReturn(activeSearchTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks();

// this will trigger cancellation but the cancellation should not happen as the node is not is duress because of search traffic
service.doRun();

verify(mockTaskManager, times(0)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
assertEquals(0, service.getSearchBackpressureState(SearchTask.class).getCancellationCount());
assertEquals(0, service.getSearchBackpressureState(SearchShardTask.class).getCancellationCount());
}

private SearchBackpressureSettings getBackpressureSettings(String mode, double ratio, double rate, double burst) {
return spy(
new SearchBackpressureSettings(
Expand Down

0 comments on commit 0c7043d

Please sign in to comment.