From 405db06a1a2198dcad1f23ef00af77cc63b541ce Mon Sep 17 00:00:00 2001
From: "github-actions[bot]" <github-actions[bot]@users.noreply.github.com>
Date: Fri, 21 Jun 2024 16:08:04 +0000
Subject: [PATCH] Bug/sbp cancellation (#13474)

* change cancellation logic to fix disparity bw trackers and resource duress

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add additional tests for searchBackpressureService and refactor code

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add enumMap instead of list for tracking taskResourceUsageTrackets

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add nodeNotInDuress test for nodeDuressTrackers class

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add entry in CHANGELOG

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* remove wildcard import

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* streamline imports

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add additional test case to test the circuit breaker for SBP logic

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add missing javadoc to resourece type enum

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add javadoc to a method

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* fix javadoc warnings

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* fix javadoc warnings

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

---------

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
(cherry picked from commit bcccedbba41df6e69cd73a118e4106c3a09f47ca)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
---
 CHANGELOG.md                                  |   1 +
 .../org/opensearch/search/ResourceType.java   |  41 +++
 .../SearchBackpressureService.java            | 241 ++++++++------
 .../stats/SearchShardTaskStats.java           |   2 +-
 .../backpressure/stats/SearchTaskStats.java   |   2 +-
 .../trackers/CpuUsageTracker.java             |  45 +--
 .../trackers/ElapsedTimeTracker.java          |  49 +--
 .../trackers/HeapUsageTracker.java            |  65 ++--
 .../trackers/NodeDuressTracker.java           |  41 ---
 .../trackers/NodeDuressTrackers.java          |  83 +++++
 .../trackers/TaskResourceUsageTracker.java    |  63 ----
 .../trackers/TaskResourceUsageTrackers.java   | 148 +++++++++
 .../opensearch/tasks/TaskCancellation.java    |  15 +
 .../SearchBackpressureServiceTests.java       | 308 +++++++++++++++---
 .../stats/SearchShardTaskStatsTests.java      |   2 +-
 .../stats/SearchTaskStatsTests.java           |   2 +-
 .../trackers/CpuUsageTrackerTests.java        |   6 +-
 .../trackers/ElapsedTimeTrackerTests.java     |   4 +-
 .../trackers/HeapUsageTrackerTests.java       |  21 +-
 .../trackers/NodeDuressTrackerTests.java      |  15 +-
 .../trackers/NodeDuressTrackersTests.java     |  84 +++++
 .../tasks/TaskCancellationTests.java          |   4 +-
 .../SearchBackpressureTestHelpers.java        |   8 +-
 23 files changed, 915 insertions(+), 335 deletions(-)
 create mode 100644 server/src/main/java/org/opensearch/search/ResourceType.java
 delete mode 100644 server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java
 create mode 100644 server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java
 delete mode 100644 server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java
 create mode 100644 server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackers.java
 create mode 100644 server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java

diff --git a/CHANGELOG.md b/CHANGELOG.md
index d0a3a6dea671a..7ce7f1d851467 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 ### Removed
 
 ### Fixed
+- Fix bug in SBP cancellation logic ([#13259](https://github.com/opensearch-project/OpenSearch/pull/13474))
 - Fix handling of Short and Byte data types in ScriptProcessor ingest pipeline ([#14379](https://github.com/opensearch-project/OpenSearch/issues/14379))
 - Switch to iterative version of WKT format parser ([#14086](https://github.com/opensearch-project/OpenSearch/pull/14086))
 - Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155))
diff --git a/server/src/main/java/org/opensearch/search/ResourceType.java b/server/src/main/java/org/opensearch/search/ResourceType.java
new file mode 100644
index 0000000000000..5bbcd7de1c2ce
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/ResourceType.java
@@ -0,0 +1,41 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search;
+
+/**
+ * Enum to hold the resource type
+ */
+public enum ResourceType {
+    CPU("cpu"),
+    JVM("jvm");
+
+    private final String name;
+
+    ResourceType(String name) {
+        this.name = name;
+    }
+
+    /**
+     * The string match here is case-sensitive
+     * @param s name matching the resource type name
+     * @return a {@link ResourceType}
+     */
+    public static ResourceType fromName(String s) {
+        for (ResourceType resourceType : values()) {
+            if (resourceType.getName().equals(s)) {
+                return resourceType;
+            }
+        }
+        throw new IllegalArgumentException("Unknown resource type: [" + s + "]");
+    }
+
+    private String getName() {
+        return name;
+    }
+}
diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java
index fac1ac319087e..c51b3ae452be1 100644
--- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java
+++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java
@@ -18,6 +18,7 @@
 import org.opensearch.common.settings.Setting;
 import org.opensearch.monitor.jvm.JvmStats;
 import org.opensearch.monitor.process.ProcessProbe;
+import org.opensearch.search.ResourceType;
 import org.opensearch.search.backpressure.settings.SearchBackpressureMode;
 import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
 import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
@@ -28,9 +29,11 @@
 import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
 import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
 import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
-import org.opensearch.search.backpressure.trackers.NodeDuressTracker;
-import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
+import org.opensearch.search.backpressure.trackers.NodeDuressTrackers;
+import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker;
 import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
 import org.opensearch.tasks.CancellableTask;
 import org.opensearch.tasks.SearchBackpressureTask;
 import org.opensearch.tasks.Task;
@@ -44,11 +47,13 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.DoubleSupplier;
+import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 
@@ -62,7 +67,14 @@
  */
 public class SearchBackpressureService extends AbstractLifecycleComponent implements TaskCompletionListener {
     private static final Logger logger = LogManager.getLogger(SearchBackpressureService.class);
-
+    private static final Map<TaskResourceUsageTrackerType, Function<NodeDuressTrackers, Boolean>> trackerApplyConditions = Map.of(
+        TaskResourceUsageTrackerType.CPU_USAGE_TRACKER,
+        (nodeDuressTrackers) -> nodeDuressTrackers.isResourceInDuress(ResourceType.CPU),
+        TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER,
+        (nodeDuressTrackers) -> isHeapTrackingSupported() && nodeDuressTrackers.isResourceInDuress(ResourceType.JVM),
+        TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER,
+        (nodeDuressTrackers) -> true
+    );
     private volatile Scheduler.Cancellable scheduledFuture;
 
     private final SearchBackpressureSettings settings;
@@ -70,8 +82,8 @@ public class SearchBackpressureService extends AbstractLifecycleComponent implem
     private final ThreadPool threadPool;
     private final LongSupplier timeNanosSupplier;
 
-    private final List<NodeDuressTracker> nodeDuressTrackers;
-    private final Map<Class<? extends SearchBackpressureTask>, List<TaskResourceUsageTracker>> taskTrackers;
+    private final NodeDuressTrackers nodeDuressTrackers;
+    private final Map<Class<? extends SearchBackpressureTask>, TaskResourceUsageTrackers> taskTrackers;
 
     private final Map<Class<? extends SearchBackpressureTask>, SearchBackpressureState> searchBackpressureStates;
     private final TaskManager taskManager;
@@ -82,19 +94,26 @@ public SearchBackpressureService(
         ThreadPool threadPool,
         TaskManager taskManager
     ) {
-        this(
-            settings,
-            taskResourceTrackingService,
-            threadPool,
-            System::nanoTime,
-            List.of(
-                new NodeDuressTracker(
-                    () -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= settings.getNodeDuressSettings().getCpuThreshold()
-                ),
-                new NodeDuressTracker(
-                    () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings().getHeapThreshold()
-                )
-            ),
+        this(settings, taskResourceTrackingService, threadPool, System::nanoTime, new NodeDuressTrackers(new EnumMap<>(ResourceType.class) {
+            {
+                put(
+                    ResourceType.CPU,
+                    new NodeDuressTracker(
+                        () -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= settings.getNodeDuressSettings()
+                            .getCpuThreshold(),
+                        () -> settings.getNodeDuressSettings().getNumSuccessiveBreaches()
+                    )
+                );
+                put(
+                    ResourceType.JVM,
+                    new NodeDuressTracker(
+                        () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings()
+                            .getHeapThreshold(),
+                        () -> settings.getNodeDuressSettings().getNumSuccessiveBreaches()
+                    )
+                );
+            }
+        }),
             getTrackers(
                 settings.getSearchTaskSettings()::getCpuTimeNanosThreshold,
                 settings.getSearchTaskSettings()::getHeapVarianceThreshold,
@@ -117,14 +136,14 @@ public SearchBackpressureService(
         );
     }
 
-    public SearchBackpressureService(
+    SearchBackpressureService(
         SearchBackpressureSettings settings,
         TaskResourceTrackingService taskResourceTrackingService,
         ThreadPool threadPool,
         LongSupplier timeNanosSupplier,
-        List<NodeDuressTracker> nodeDuressTrackers,
-        List<TaskResourceUsageTracker> searchTaskTrackers,
-        List<TaskResourceUsageTracker> searchShardTaskTrackers,
+        NodeDuressTrackers nodeDuressTrackers,
+        TaskResourceUsageTrackers searchTaskTrackers,
+        TaskResourceUsageTrackers searchShardTaskTrackers,
         TaskManager taskManager
     ) {
         this.settings = settings;
@@ -163,40 +182,48 @@ void doRun() {
             return;
         }
 
-        if (isNodeInDuress() == false) {
+        if (nodeDuressTrackers.isNodeInDuress() == false) {
             return;
         }
 
         List<CancellableTask> searchTasks = getTaskByType(SearchTask.class);
         List<CancellableTask> searchShardTasks = getTaskByType(SearchShardTask.class);
-        List<CancellableTask> cancellableTasks = new ArrayList<>();
+
+        boolean isHeapUsageDominatedBySearchTasks = isHeapUsageDominatedBySearch(
+            searchTasks,
+            getSettings().getSearchTaskSettings().getTotalHeapPercentThreshold()
+        );
+        boolean isHeapUsageDominatedBySearchShardTasks = isHeapUsageDominatedBySearch(
+            searchShardTasks,
+            getSettings().getSearchShardTaskSettings().getTotalHeapPercentThreshold()
+        );
+        final Map<Class<? extends SearchBackpressureTask>, List<CancellableTask>> cancellableTasks = Map.of(
+            SearchTask.class,
+            isHeapUsageDominatedBySearchTasks ? searchTasks : Collections.emptyList(),
+            SearchShardTask.class,
+            isHeapUsageDominatedBySearchShardTasks ? searchShardTasks : Collections.emptyList()
+        );
 
         // Force-refresh usage stats of these tasks before making a cancellation decision.
         taskResourceTrackingService.refreshResourceStats(searchTasks.toArray(new Task[0]));
         taskResourceTrackingService.refreshResourceStats(searchShardTasks.toArray(new Task[0]));
 
-        // Check if increase in heap usage is due to SearchTasks
-        if (HeapUsageTracker.isHeapUsageDominatedBySearch(
-            searchTasks,
-            getSettings().getSearchTaskSettings().getTotalHeapPercentThreshold()
-        )) {
-            cancellableTasks.addAll(searchTasks);
-        }
+        List<TaskCancellation> taskCancellations = new ArrayList<>();
 
-        // Check if increase in heap usage is due to SearchShardTasks
-        if (HeapUsageTracker.isHeapUsageDominatedBySearch(
-            searchShardTasks,
-            getSettings().getSearchShardTaskSettings().getTotalHeapPercentThreshold()
-        )) {
-            cancellableTasks.addAll(searchShardTasks);
+        for (TaskResourceUsageTrackerType trackerType : TaskResourceUsageTrackerType.values()) {
+            if (shouldApply(trackerType)) {
+                addResourceTrackerBasedCancellations(trackerType, taskCancellations, cancellableTasks);
+            }
         }
 
-        // none of the task type is breaching the heap usage thresholds and hence we do not cancel any tasks
-        if (cancellableTasks.isEmpty()) {
-            return;
-        }
+        // Since these cancellations might be duplicate due to multiple trackers causing cancellation for same task
+        // We need to merge them
+        taskCancellations = mergeTaskCancellations(taskCancellations).stream()
+            .map(this::addSBPStateUpdateCallback)
+            .filter(TaskCancellation::isEligibleForCancellation)
+            .collect(Collectors.toList());
 
-        for (TaskCancellation taskCancellation : getTaskCancellations(cancellableTasks)) {
+        for (TaskCancellation taskCancellation : taskCancellations) {
             logger.warn(
                 "[{} mode] cancelling task [{}] due to high resource consumption [{}]",
                 mode.getName(),
@@ -226,6 +253,66 @@ void doRun() {
         }
     }
 
+    /**
+     * Had to define this method to help mock this static method to test the scenario where SearchTraffic should not be
+     * penalised when not breaching the threshold
+     * @param searchTasks inFlight co-ordinator requests
+     * @param threshold   miniumum  jvm allocated bytes ratio w.r.t. available heap
+     * @return a boolean value based on whether the threshold is breached
+     */
+    boolean isHeapUsageDominatedBySearch(List<CancellableTask> searchTasks, double threshold) {
+        return HeapUsageTracker.isHeapUsageDominatedBySearch(searchTasks, threshold);
+    }
+
+    private TaskCancellation addSBPStateUpdateCallback(TaskCancellation taskCancellation) {
+        CancellableTask task = taskCancellation.getTask();
+        Runnable toAddCancellationCallbackForSBPState = searchBackpressureStates.get(SearchShardTask.class)::incrementCancellationCount;
+        if (task instanceof SearchTask) {
+            toAddCancellationCallbackForSBPState = searchBackpressureStates.get(SearchTask.class)::incrementCancellationCount;
+        }
+        List<Runnable> newOnCancelCallbacks = new ArrayList<>(taskCancellation.getOnCancelCallbacks());
+        newOnCancelCallbacks.add(toAddCancellationCallbackForSBPState);
+        return new TaskCancellation(task, taskCancellation.getReasons(), newOnCancelCallbacks);
+    }
+
+    private boolean shouldApply(TaskResourceUsageTrackerType trackerType) {
+        return trackerApplyConditions.get(trackerType).apply(nodeDuressTrackers);
+    }
+
+    private List<TaskCancellation> addResourceTrackerBasedCancellations(
+        TaskResourceUsageTrackerType type,
+        List<TaskCancellation> taskCancellations,
+        Map<Class<? extends SearchBackpressureTask>, List<CancellableTask>> cancellableTasks
+    ) {
+        for (Map.Entry<Class<? extends SearchBackpressureTask>, TaskResourceUsageTrackers> taskResourceUsageTrackers : taskTrackers
+            .entrySet()) {
+            final Optional<TaskResourceUsageTracker> taskResourceUsageTracker = taskResourceUsageTrackers.getValue().getTracker(type);
+            final Class<? extends SearchBackpressureTask> taskType = taskResourceUsageTrackers.getKey();
+
+            taskResourceUsageTracker.ifPresent(
+                tracker -> taskCancellations.addAll(tracker.getTaskCancellations(cancellableTasks.get(taskType)))
+            );
+        }
+
+        return taskCancellations;
+    }
+
+    /**
+     * Method to reduce the taskCancellations into unique bunch
+     * @param taskCancellations all task cancellations
+     * @return unique task cancellations
+     */
+    private List<TaskCancellation> mergeTaskCancellations(final List<TaskCancellation> taskCancellations) {
+        final Map<Long, TaskCancellation> uniqueTaskCancellations = new HashMap<>();
+
+        for (TaskCancellation taskCancellation : taskCancellations) {
+            final long taskId = taskCancellation.getTask().getId();
+            uniqueTaskCancellations.put(taskId, uniqueTaskCancellations.getOrDefault(taskId, taskCancellation).merge(taskCancellation));
+        }
+
+        return new ArrayList<>(uniqueTaskCancellations.values());
+    }
+
     /**
      * Given a task, returns the type of the task
      */
@@ -243,16 +330,7 @@ Class<? extends SearchBackpressureTask> getTaskType(Task task) {
      * Returns true if the node is in duress consecutively for the past 'n' observations.
      */
     boolean isNodeInDuress() {
-        boolean isNodeInDuress = false;
-        int numSuccessiveBreaches = getSettings().getNodeDuressSettings().getNumSuccessiveBreaches();
-
-        for (NodeDuressTracker tracker : nodeDuressTrackers) {
-            if (tracker.check() >= numSuccessiveBreaches) {
-                isNodeInDuress = true;  // not breaking the loop so that each tracker's streak gets updated.
-            }
-        }
-
-        return isNodeInDuress;
+        return nodeDuressTrackers.isNodeInDuress();
     }
 
     /*
@@ -271,39 +349,6 @@ <T extends CancellableTask & SearchBackpressureTask> List<CancellableTask> getTa
             .collect(Collectors.toUnmodifiableList());
     }
 
-    /**
-     * Returns a TaskCancellation wrapper containing the list of reasons (possibly zero), along with an overall
-     * cancellation score for the given task. Cancelling a task with a higher score has better chance of recovering the
-     * node from duress.
-     */
-    TaskCancellation getTaskCancellation(CancellableTask task) {
-        List<TaskCancellation.Reason> reasons = new ArrayList<>();
-        List<Runnable> callbacks = new ArrayList<>();
-        Class<? extends SearchBackpressureTask> taskType = getTaskType(task);
-        List<TaskResourceUsageTracker> trackers = taskTrackers.get(taskType);
-        for (TaskResourceUsageTracker tracker : trackers) {
-            Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
-            if (reason.isPresent()) {
-                callbacks.add(tracker::incrementCancellations);
-                reasons.add(reason.get());
-            }
-        }
-        callbacks.add(searchBackpressureStates.get(taskType)::incrementCancellationCount);
-
-        return new TaskCancellation(task, reasons, callbacks);
-    }
-
-    /**
-     * Returns a list of TaskCancellations sorted by descending order of their cancellation scores.
-     */
-    List<TaskCancellation> getTaskCancellations(List<? extends CancellableTask> tasks) {
-        return tasks.stream()
-            .map(this::getTaskCancellation)
-            .filter(TaskCancellation::isEligibleForCancellation)
-            .sorted(Comparator.reverseOrder())
-            .collect(Collectors.toUnmodifiableList());
-    }
-
     SearchBackpressureSettings getSettings() {
         return settings;
     }
@@ -315,7 +360,7 @@ SearchBackpressureState getSearchBackpressureState(Class<? extends SearchBackpre
     /**
      * Given the threshold suppliers, returns the list of applicable trackers
      */
-    public static List<TaskResourceUsageTracker> getTrackers(
+    public static TaskResourceUsageTrackers getTrackers(
         LongSupplier cpuThresholdSupplier,
         DoubleSupplier heapVarianceSupplier,
         DoubleSupplier heapPercentThresholdSupplier,
@@ -324,23 +369,27 @@ public static List<TaskResourceUsageTracker> getTrackers(
         ClusterSettings clusterSettings,
         Setting<Integer> windowSizeSetting
     ) {
-        List<TaskResourceUsageTracker> trackers = new ArrayList<>();
-        trackers.add(new CpuUsageTracker(cpuThresholdSupplier));
+        TaskResourceUsageTrackers trackers = new TaskResourceUsageTrackers();
+        trackers.addTracker(new CpuUsageTracker(cpuThresholdSupplier), TaskResourceUsageTrackerType.CPU_USAGE_TRACKER);
         if (isHeapTrackingSupported()) {
-            trackers.add(
+            trackers.addTracker(
                 new HeapUsageTracker(
                     heapVarianceSupplier,
                     heapPercentThresholdSupplier,
                     heapMovingAverageWindowSize,
                     clusterSettings,
                     windowSizeSetting
-                )
+                ),
+                TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER
             );
         } else {
             logger.warn("heap size couldn't be determined");
         }
-        trackers.add(new ElapsedTimeTracker(ElapsedTimeNanosSupplier, System::nanoTime));
-        return Collections.unmodifiableList(trackers);
+        trackers.addTracker(
+            new ElapsedTimeTracker(ElapsedTimeNanosSupplier, System::nanoTime),
+            TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER
+        );
+        return trackers;
     }
 
     @Override
@@ -360,8 +409,8 @@ public void onTaskCompleted(Task task) {
         }
 
         List<Exception> exceptions = new ArrayList<>();
-        List<TaskResourceUsageTracker> trackers = taskTrackers.get(taskType);
-        for (TaskResourceUsageTracker tracker : trackers) {
+        TaskResourceUsageTrackers trackers = taskTrackers.get(taskType);
+        for (TaskResourceUsageTracker tracker : trackers.all()) {
             try {
                 tracker.update(task);
             } catch (Exception e) {
@@ -400,6 +449,7 @@ public SearchBackpressureStats nodeStats() {
             searchBackpressureStates.get(SearchTask.class).getCancellationCount(),
             searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(),
             taskTrackers.get(SearchTask.class)
+                .all()
                 .stream()
                 .collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchTasks)))
         );
@@ -408,6 +458,7 @@ public SearchBackpressureStats nodeStats() {
             searchBackpressureStates.get(SearchShardTask.class).getCancellationCount(),
             searchBackpressureStates.get(SearchShardTask.class).getLimitReachedCount(),
             taskTrackers.get(SearchShardTask.class)
+                .all()
                 .stream()
                 .collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks)))
         );
diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java
index 678c19d83fb96..6c76f94f5c940 100644
--- a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java
+++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchShardTaskStats.java
@@ -17,8 +17,8 @@
 import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
 import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
 import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
-import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
 import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
 
 import java.io.IOException;
 import java.util.Map;
diff --git a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java
index 302350104bd3a..279f239703f3c 100644
--- a/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java
+++ b/server/src/main/java/org/opensearch/search/backpressure/stats/SearchTaskStats.java
@@ -17,8 +17,8 @@
 import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
 import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
 import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
-import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
 import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
 
 import java.io.IOException;
 import java.util.Map;
diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java
index 507953cb4a20e..a303b625f4b59 100644
--- a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java
+++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java
@@ -12,6 +12,7 @@
 import org.opensearch.core.common.io.stream.StreamInput;
 import org.opensearch.core.common.io.stream.StreamOutput;
 import org.opensearch.core.xcontent.XContentBuilder;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
 import org.opensearch.tasks.Task;
 import org.opensearch.tasks.TaskCancellation;
 
@@ -34,7 +35,30 @@ public class CpuUsageTracker extends TaskResourceUsageTracker {
     private final LongSupplier thresholdSupplier;
 
     public CpuUsageTracker(LongSupplier thresholdSupplier) {
+        this(thresholdSupplier, (task) -> {
+            long usage = task.getTotalResourceStats().getCpuTimeInNanos();
+            long threshold = thresholdSupplier.getAsLong();
+
+            if (usage < threshold) {
+                return Optional.empty();
+            }
+
+            return Optional.of(
+                new TaskCancellation.Reason(
+                    "cpu usage exceeded ["
+                        + new TimeValue(usage, TimeUnit.NANOSECONDS)
+                        + " >= "
+                        + new TimeValue(threshold, TimeUnit.NANOSECONDS)
+                        + "]",
+                    1  // TODO: fine-tune the cancellation score/weight
+                )
+            );
+        });
+    }
+
+    public CpuUsageTracker(LongSupplier thresholdSupplier, ResourceUsageBreachEvaluator resourceUsageBreachEvaluator) {
         this.thresholdSupplier = thresholdSupplier;
+        this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator;
     }
 
     @Override
@@ -42,27 +66,6 @@ public String name() {
         return CPU_USAGE_TRACKER.getName();
     }
 
-    @Override
-    public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
-        long usage = task.getTotalResourceStats().getCpuTimeInNanos();
-        long threshold = thresholdSupplier.getAsLong();
-
-        if (usage < threshold) {
-            return Optional.empty();
-        }
-
-        return Optional.of(
-            new TaskCancellation.Reason(
-                "cpu usage exceeded ["
-                    + new TimeValue(usage, TimeUnit.NANOSECONDS)
-                    + " >= "
-                    + new TimeValue(threshold, TimeUnit.NANOSECONDS)
-                    + "]",
-                1  // TODO: fine-tune the cancellation score/weight
-            )
-        );
-    }
-
     @Override
     public TaskResourceUsageTracker.Stats stats(List<? extends Task> activeTasks) {
         long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0);
diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java
index f1e8abe7e3230..216947315cd2d 100644
--- a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java
+++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java
@@ -12,6 +12,7 @@
 import org.opensearch.core.common.io.stream.StreamInput;
 import org.opensearch.core.common.io.stream.StreamOutput;
 import org.opensearch.core.xcontent.XContentBuilder;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
 import org.opensearch.tasks.Task;
 import org.opensearch.tasks.TaskCancellation;
 
@@ -34,8 +35,35 @@ public class ElapsedTimeTracker extends TaskResourceUsageTracker {
     private final LongSupplier timeNanosSupplier;
 
     public ElapsedTimeTracker(LongSupplier thresholdSupplier, LongSupplier timeNanosSupplier) {
+        this(thresholdSupplier, timeNanosSupplier, (Task task) -> {
+            long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
+            long threshold = thresholdSupplier.getAsLong();
+
+            if (usage < threshold) {
+                return Optional.empty();
+            }
+
+            return Optional.of(
+                new TaskCancellation.Reason(
+                    "elapsed time exceeded ["
+                        + new TimeValue(usage, TimeUnit.NANOSECONDS)
+                        + " >= "
+                        + new TimeValue(threshold, TimeUnit.NANOSECONDS)
+                        + "]",
+                    1  // TODO: fine-tune the cancellation score/weight
+                )
+            );
+        });
+    }
+
+    public ElapsedTimeTracker(
+        LongSupplier thresholdSupplier,
+        LongSupplier timeNanosSupplier,
+        ResourceUsageBreachEvaluator resourceUsageBreachEvaluator
+    ) {
         this.thresholdSupplier = thresholdSupplier;
         this.timeNanosSupplier = timeNanosSupplier;
+        this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator;
     }
 
     @Override
@@ -43,27 +71,6 @@ public String name() {
         return ELAPSED_TIME_TRACKER.getName();
     }
 
-    @Override
-    public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
-        long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
-        long threshold = thresholdSupplier.getAsLong();
-
-        if (usage < threshold) {
-            return Optional.empty();
-        }
-
-        return Optional.of(
-            new TaskCancellation.Reason(
-                "elapsed time exceeded ["
-                    + new TimeValue(usage, TimeUnit.NANOSECONDS)
-                    + " >= "
-                    + new TimeValue(threshold, TimeUnit.NANOSECONDS)
-                    + "]",
-                1  // TODO: fine-tune the cancellation score/weight
-            )
-        );
-    }
-
     @Override
     public TaskResourceUsageTracker.Stats stats(List<? extends Task> activeTasks) {
         long now = timeNanosSupplier.getAsLong();
diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java
index 56b9f947f6e37..c69de8ce21f89 100644
--- a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java
+++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java
@@ -18,6 +18,7 @@
 import org.opensearch.core.common.unit.ByteSizeValue;
 import org.opensearch.core.xcontent.XContentBuilder;
 import org.opensearch.monitor.jvm.JvmStats;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
 import org.opensearch.tasks.CancellableTask;
 import org.opensearch.tasks.Task;
 import org.opensearch.tasks.TaskCancellation;
@@ -55,6 +56,43 @@ public HeapUsageTracker(
         this.heapPercentThresholdSupplier = heapPercentThresholdSupplier;
         this.movingAverageReference = new AtomicReference<>(new MovingAverage(heapMovingAverageWindowSize));
         clusterSettings.addSettingsUpdateConsumer(windowSizeSetting, this::updateWindowSize);
+        setDefaultResourceUsageBreachEvaluator();
+    }
+
+    /**
+     * Had to refactor this method out of the constructor as we can't pass a lambda which references a member variable in constructor
+     * error: cannot reference movingAverageReference before supertype constructor has been called
+     */
+    private void setDefaultResourceUsageBreachEvaluator() {
+        this.resourceUsageBreachEvaluator = (task) -> {
+            MovingAverage movingAverage = movingAverageReference.get();
+
+            // There haven't been enough measurements.
+            if (movingAverage.isReady() == false) {
+                return Optional.empty();
+            }
+
+            double currentUsage = task.getTotalResourceStats().getMemoryInBytes();
+            double averageUsage = movingAverage.getAverage();
+            double variance = heapVarianceSupplier.getAsDouble();
+            double allowedUsage = averageUsage * variance;
+            double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES;
+
+            if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) {
+                return Optional.empty();
+            }
+
+            return Optional.of(
+                new TaskCancellation.Reason(
+                    "heap usage exceeded ["
+                        + new ByteSizeValue((long) currentUsage)
+                        + " >= "
+                        + new ByteSizeValue((long) allowedUsage)
+                        + "]",
+                    (int) (currentUsage / averageUsage)  // TODO: fine-tune the cancellation score/weight
+                )
+            );
+        };
     }
 
     @Override
@@ -67,33 +105,6 @@ public void update(Task task) {
         movingAverageReference.get().record(task.getTotalResourceStats().getMemoryInBytes());
     }
 
-    @Override
-    public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
-        MovingAverage movingAverage = movingAverageReference.get();
-
-        // There haven't been enough measurements.
-        if (movingAverage.isReady() == false) {
-            return Optional.empty();
-        }
-
-        double currentUsage = task.getTotalResourceStats().getMemoryInBytes();
-        double averageUsage = movingAverage.getAverage();
-        double variance = heapVarianceSupplier.getAsDouble();
-        double allowedUsage = averageUsage * variance;
-        double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES;
-
-        if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) {
-            return Optional.empty();
-        }
-
-        return Optional.of(
-            new TaskCancellation.Reason(
-                "heap usage exceeded [" + new ByteSizeValue((long) currentUsage) + " >= " + new ByteSizeValue((long) allowedUsage) + "]",
-                (int) (currentUsage / averageUsage)  // TODO: fine-tune the cancellation score/weight
-            )
-        );
-    }
-
     private void updateWindowSize(int heapMovingAverageWindowSize) {
         this.movingAverageReference.set(new MovingAverage(heapMovingAverageWindowSize));
     }
diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java
deleted file mode 100644
index 8e35c724a8fef..0000000000000
--- a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTracker.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.search.backpressure.trackers;
-
-import org.opensearch.common.util.Streak;
-
-import java.util.function.BooleanSupplier;
-
-/**
- * NodeDuressTracker is used to check if the node is in duress.
- *
- * @opensearch.internal
- */
-public class NodeDuressTracker {
-    /**
-     * Tracks the number of consecutive breaches.
-     */
-    private final Streak breaches = new Streak();
-
-    /**
-     * Predicate that returns true when the node is in duress.
-     */
-    private final BooleanSupplier isNodeInDuress;
-
-    public NodeDuressTracker(BooleanSupplier isNodeInDuress) {
-        this.isNodeInDuress = isNodeInDuress;
-    }
-
-    /**
-     * Evaluates the predicate and returns the number of consecutive breaches.
-     */
-    public int check() {
-        return breaches.record(isNodeInDuress.getAsBoolean());
-    }
-}
diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java
new file mode 100644
index 0000000000000..ae60a82fc2816
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackers.java
@@ -0,0 +1,83 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.backpressure.trackers;
+
+import org.opensearch.common.util.Streak;
+import org.opensearch.search.ResourceType;
+
+import java.util.Map;
+import java.util.function.BooleanSupplier;
+import java.util.function.IntSupplier;
+
+/**
+ * NodeDuressTrackers is used to check if the node is in duress based on various resources.
+ *
+ * @opensearch.internal
+ */
+public class NodeDuressTrackers {
+    private final Map<ResourceType, NodeDuressTracker> duressTrackers;
+
+    public NodeDuressTrackers(Map<ResourceType, NodeDuressTracker> duressTrackers) {
+        this.duressTrackers = duressTrackers;
+    }
+
+    /**
+     * Method to check the {@link ResourceType} in duress
+     * @return Boolean
+     */
+    public boolean isResourceInDuress(ResourceType resourceType) {
+        return duressTrackers.get(resourceType).test();
+    }
+
+    /**
+     * Method to evaluate whether the node is in duress or not
+     * @return true if node is in duress because of either system resource
+     */
+    public boolean isNodeInDuress() {
+        for (ResourceType resourceType : ResourceType.values()) {
+            if (isResourceInDuress(resourceType)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * NodeDuressTracker is used to check if the node is in duress
+     * @opensearch.internal
+     */
+    public static class NodeDuressTracker {
+        /**
+         * Tracks the number of consecutive breaches.
+         */
+        private final Streak breaches = new Streak();
+
+        /**
+         * Predicate that returns true when the node is in duress.
+         */
+        private final BooleanSupplier isNodeInDuress;
+
+        /**
+         * Predicate that returns the max number of breaches allowed for this resource before we mark it as in duress
+         */
+        private final IntSupplier maxBreachAllowedSupplier;
+
+        public NodeDuressTracker(BooleanSupplier isNodeInDuress, IntSupplier maxBreachAllowedSupplier) {
+            this.isNodeInDuress = isNodeInDuress;
+            this.maxBreachAllowedSupplier = maxBreachAllowedSupplier;
+        }
+
+        /**
+         * Returns true if the node is in duress consecutively for the past 'n' observations.
+         */
+        public boolean test() {
+            return breaches.record(isNodeInDuress.getAsBoolean()) >= maxBreachAllowedSupplier.getAsInt();
+        }
+    }
+}
diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java
deleted file mode 100644
index ce15e9e9b6622..0000000000000
--- a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTracker.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.search.backpressure.trackers;
-
-import org.opensearch.core.common.io.stream.Writeable;
-import org.opensearch.core.xcontent.ToXContentObject;
-import org.opensearch.tasks.Task;
-import org.opensearch.tasks.TaskCancellation;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * TaskResourceUsageTracker is used to track completions and cancellations of search related tasks.
- *
- * @opensearch.internal
- */
-public abstract class TaskResourceUsageTracker {
-    /**
-     * Counts the number of cancellations made due to this tracker.
-     */
-    private final AtomicLong cancellations = new AtomicLong();
-
-    public long incrementCancellations() {
-        return cancellations.incrementAndGet();
-    }
-
-    public long getCancellations() {
-        return cancellations.get();
-    }
-
-    /**
-     * Returns a unique name for this tracker.
-     */
-    public abstract String name();
-
-    /**
-     * Notifies the tracker to update its state when a task execution completes.
-     */
-    public void update(Task task) {}
-
-    /**
-     * Returns the cancellation reason for the given task, if it's eligible for cancellation.
-     */
-    public abstract Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task);
-
-    /**
-     * Returns the tracker's state for tasks as seen in the stats API.
-     */
-    public abstract Stats stats(List<? extends Task> activeTasks);
-
-    /**
-     * Represents the tracker's state as seen in the stats API.
-     */
-    public interface Stats extends ToXContentObject, Writeable {}
-}
diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackers.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackers.java
new file mode 100644
index 0000000000000..3b0072288681c
--- /dev/null
+++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/TaskResourceUsageTrackers.java
@@ -0,0 +1,148 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.backpressure.trackers;
+
+import org.opensearch.core.common.io.stream.Writeable;
+import org.opensearch.core.xcontent.ToXContentObject;
+import org.opensearch.tasks.CancellableTask;
+import org.opensearch.tasks.Task;
+import org.opensearch.tasks.TaskCancellation;
+
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * TaskResourceUsageTrackers is used to hold all the {@link TaskResourceUsageTracker} objects.
+ *
+ * @opensearch.internal
+ */
+public class TaskResourceUsageTrackers {
+    private final EnumMap<TaskResourceUsageTrackerType, TaskResourceUsageTracker> all;
+
+    public TaskResourceUsageTrackers() {
+        all = new EnumMap<>(TaskResourceUsageTrackerType.class);
+    }
+
+    /**
+     * adds the tracker for the TrackerType
+     * @param tracker is {@link TaskResourceUsageTracker} implementation which will be added
+     * @param trackerType is {@link TaskResourceUsageTrackerType} which depicts the implementation type
+     */
+    public void addTracker(final TaskResourceUsageTracker tracker, final TaskResourceUsageTrackerType trackerType) {
+        all.put(trackerType, tracker);
+    }
+
+    /**
+     * getter for tracker for a {@link TaskResourceUsageTrackerType}
+     * @param type for which the implementation is returned
+     * @return the {@link TaskResourceUsageTrackerType}
+     */
+    public Optional<TaskResourceUsageTracker> getTracker(TaskResourceUsageTrackerType type) {
+        return Optional.ofNullable(all.get(type));
+    }
+
+    /**
+     * Method to access all available {@link TaskResourceUsageTracker}
+     * @return all enabled and available {@link TaskResourceUsageTracker}s
+     */
+    public List<TaskResourceUsageTracker> all() {
+        return new ArrayList<>(all.values());
+    }
+
+    /**
+     * TaskResourceUsageTracker is used to track completions and cancellations of search related tasks.
+     * @opensearch.internal
+     */
+    public static abstract class TaskResourceUsageTracker {
+        /**
+         * Counts the number of cancellations made due to this tracker.
+         */
+        private final AtomicLong cancellations = new AtomicLong();
+        protected ResourceUsageBreachEvaluator resourceUsageBreachEvaluator;
+
+        /**
+         * for test purposes only
+         * @param resourceUsageBreachEvaluator which suggests whether a task should be cancelled or not
+         */
+        public void setResourceUsageBreachEvaluator(final ResourceUsageBreachEvaluator resourceUsageBreachEvaluator) {
+            this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator;
+        }
+
+        public long incrementCancellations() {
+            return cancellations.incrementAndGet();
+        }
+
+        public long getCancellations() {
+            return cancellations.get();
+        }
+
+        /**
+         * Returns a unique name for this tracker.
+         */
+        public abstract String name();
+
+        /**
+         * Notifies the tracker to update its state when a task execution completes.
+         */
+        public void update(Task task) {}
+
+        /**
+         * Returns the cancellation reason for the given task, if it's eligible for cancellation.
+         */
+        public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
+            return resourceUsageBreachEvaluator.evaluate(task);
+        }
+
+        /**
+         * Returns the tracker's state for tasks as seen in the stats API.
+         */
+        public abstract Stats stats(List<? extends Task> activeTasks);
+
+        /**
+         * Method to get taskCancellations due to this tracker for the given {@link CancellableTask} tasks
+         * @param tasks cancellation eligible tasks due to node duress and search traffic threshold breach
+         * @return the list of tasks which are breaching task level thresholds for this {@link TaskResourceUsageTracker}
+         */
+        public List<TaskCancellation> getTaskCancellations(List<CancellableTask> tasks) {
+            return tasks.stream()
+                .map(task -> this.getTaskCancellation(task, List.of(this::incrementCancellations)))
+                .filter(TaskCancellation::isEligibleForCancellation)
+                .collect(Collectors.toList());
+        }
+
+        private TaskCancellation getTaskCancellation(final CancellableTask task, final List<Runnable> cancellationCallback) {
+            Optional<TaskCancellation.Reason> reason = checkAndMaybeGetCancellationReason(task);
+            List<TaskCancellation.Reason> reasons = new ArrayList<>();
+            reason.ifPresent(reasons::add);
+
+            return new TaskCancellation(task, reasons, cancellationCallback);
+        }
+
+        /**
+         * Represents the tracker's state as seen in the stats API.
+         */
+        public interface Stats extends ToXContentObject, Writeable {}
+
+        /**
+         * This interface carries the logic to decide whether a task should be cancelled or not
+         */
+        public interface ResourceUsageBreachEvaluator {
+            /**
+             * evaluates whether the task is eligible for cancellation based on {@link TaskResourceUsageTracker} implementation
+             * @param task is input to this method on which the cancellation evaluation is performed
+             * @return a {@link TaskCancellation.Reason} why this task should be cancelled otherwise empty
+             */
+            public Optional<TaskCancellation.Reason> evaluate(final Task task);
+        }
+    }
+}
diff --git a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java
index 2d152e513f197..872f5b79bb205 100644
--- a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java
+++ b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java
@@ -42,10 +42,25 @@ public List<Reason> getReasons() {
         return reasons;
     }
 
+    public List<Runnable> getOnCancelCallbacks() {
+        return onCancelCallbacks;
+    }
+
     public String getReasonString() {
         return reasons.stream().map(Reason::getMessage).collect(Collectors.joining(", "));
     }
 
+    public TaskCancellation merge(final TaskCancellation other) {
+        if (other == this) {
+            return this;
+        }
+        final List<Reason> newReasons = new ArrayList<>(reasons);
+        newReasons.addAll(other.getReasons());
+        final List<Runnable> newOnCancelCallbacks = new ArrayList<>(onCancelCallbacks);
+        newOnCancelCallbacks.addAll(other.onCancelCallbacks);
+        return new TaskCancellation(task, newReasons, newOnCancelCallbacks);
+    }
+
     /**
      * Cancels the task and invokes all onCancelCallbacks.
      */
diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java
index f0d930c4c3acb..d5194df5dfb41 100644
--- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java
+++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java
@@ -16,6 +16,7 @@
 import org.opensearch.core.common.io.stream.StreamInput;
 import org.opensearch.core.common.io.stream.StreamOutput;
 import org.opensearch.core.xcontent.XContentBuilder;
+import org.opensearch.search.ResourceType;
 import org.opensearch.search.backpressure.settings.SearchBackpressureMode;
 import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
 import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
@@ -23,9 +24,11 @@
 import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
 import org.opensearch.search.backpressure.stats.SearchShardTaskStats;
 import org.opensearch.search.backpressure.stats.SearchTaskStats;
-import org.opensearch.search.backpressure.trackers.NodeDuressTracker;
-import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
+import org.opensearch.search.backpressure.trackers.NodeDuressTrackers;
+import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker;
 import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
 import org.opensearch.tasks.CancellableTask;
 import org.opensearch.tasks.Task;
 import org.opensearch.tasks.TaskCancellation;
@@ -42,6 +45,7 @@
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -52,10 +56,14 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.LongSupplier;
 
+import static org.opensearch.search.ResourceType.CPU;
+import static org.opensearch.search.ResourceType.JVM;
 import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyDouble;
+import static org.mockito.Mockito.anyList;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -89,8 +97,15 @@ public void testIsNodeInDuress() {
 
         AtomicReference<Double> cpuUsage = new AtomicReference<>();
         AtomicReference<Double> heapUsage = new AtomicReference<>();
-        NodeDuressTracker cpuUsageTracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5);
-        NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> heapUsage.get() >= 0.5);
+        NodeDuressTracker cpuUsageTracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5, () -> 3);
+        NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> heapUsage.get() >= 0.5, () -> 3);
+
+        EnumMap<ResourceType, NodeDuressTracker> duressTrackers = new EnumMap<>(ResourceType.class) {
+            {
+                put(ResourceType.JVM, heapUsageTracker);
+                put(ResourceType.CPU, cpuUsageTracker);
+            }
+        };
 
         SearchBackpressureSettings settings = new SearchBackpressureSettings(
             Settings.EMPTY,
@@ -102,9 +117,9 @@ public void testIsNodeInDuress() {
             mockTaskResourceTrackingService,
             threadPool,
             System::nanoTime,
-            List.of(cpuUsageTracker, heapUsageTracker),
-            Collections.emptyList(),
-            Collections.emptyList(),
+            new NodeDuressTrackers(duressTrackers),
+            new TaskResourceUsageTrackers(),
+            new TaskResourceUsageTrackers(),
             taskManager
         );
 
@@ -132,6 +147,8 @@ public void testTrackerStateUpdateOnSearchTaskCompletion() {
         TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class);
         LongSupplier mockTimeNanosSupplier = () -> TimeUnit.SECONDS.toNanos(1234);
         TaskResourceUsageTracker mockTaskResourceUsageTracker = mock(TaskResourceUsageTracker.class);
+        TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers();
+        taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER);
 
         SearchBackpressureSettings settings = new SearchBackpressureSettings(
             Settings.EMPTY,
@@ -143,15 +160,15 @@ public void testTrackerStateUpdateOnSearchTaskCompletion() {
             mockTaskResourceTrackingService,
             threadPool,
             mockTimeNanosSupplier,
-            Collections.emptyList(),
-            List.of(mockTaskResourceUsageTracker),
-            Collections.emptyList(),
+            new NodeDuressTrackers(new EnumMap<>(ResourceType.class)),
+            taskResourceUsageTrackers,
+            new TaskResourceUsageTrackers(),
             taskManager
         );
 
         for (int i = 0; i < 100; i++) {
             // service.onTaskCompleted(new SearchTask(1, "test", "test", () -> "Test", TaskId.EMPTY_TASK_ID, new HashMap<>()));
-            service.onTaskCompleted(createMockTaskWithResourceStats(SearchTask.class, 100, 200));
+            service.onTaskCompleted(createMockTaskWithResourceStats(SearchTask.class, 100, 200, i));
         }
         assertEquals(100, service.getSearchBackpressureState(SearchTask.class).getCompletionCount());
         verify(mockTaskResourceUsageTracker, times(100)).update(any());
@@ -161,6 +178,8 @@ public void testTrackerStateUpdateOnSearchShardTaskCompletion() {
         TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class);
         LongSupplier mockTimeNanosSupplier = () -> TimeUnit.SECONDS.toNanos(1234);
         TaskResourceUsageTracker mockTaskResourceUsageTracker = mock(TaskResourceUsageTracker.class);
+        TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers();
+        taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER);
 
         SearchBackpressureSettings settings = new SearchBackpressureSettings(
             Settings.EMPTY,
@@ -172,16 +191,16 @@ public void testTrackerStateUpdateOnSearchShardTaskCompletion() {
             mockTaskResourceTrackingService,
             threadPool,
             mockTimeNanosSupplier,
-            Collections.emptyList(),
-            Collections.emptyList(),
-            List.of(mockTaskResourceUsageTracker),
+            new NodeDuressTrackers(new EnumMap<>(ResourceType.class)),
+            new TaskResourceUsageTrackers(),
+            taskResourceUsageTrackers,
             taskManager
         );
 
         // Record task completions to update the tracker state. Tasks other than SearchTask & SearchShardTask are ignored.
-        service.onTaskCompleted(createMockTaskWithResourceStats(CancellableTask.class, 100, 200));
+        service.onTaskCompleted(createMockTaskWithResourceStats(CancellableTask.class, 100, 200, 101));
         for (int i = 0; i < 100; i++) {
-            service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, 200));
+            service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, 200, i));
         }
         assertEquals(100, service.getSearchBackpressureState(SearchShardTask.class).getCompletionCount());
         verify(mockTaskResourceUsageTracker, times(100)).update(any());
@@ -192,21 +211,41 @@ public void testSearchTaskInFlightCancellation() {
         TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class);
         AtomicLong mockTime = new AtomicLong(0);
         LongSupplier mockTimeNanosSupplier = mockTime::get;
-        NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true);
+        NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true, () -> 3);
 
-        TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker();
+        TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker(
+            TaskResourceUsageTrackerType.CPU_USAGE_TRACKER,
+            (task) -> {
+                if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) {
+                    return Optional.empty();
+                }
+
+                return Optional.of(new TaskCancellation.Reason("limits exceeded", 5));
+            }
+        );
+        TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers();
+        taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER);
 
         // Mocking 'settings' with predictable rate limiting thresholds.
         SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 5.0);
 
+        NodeDuressTracker heapUsageTracker = new NodeDuressTracker(() -> false, () -> 3);
+
+        EnumMap<ResourceType, NodeDuressTracker> duressTrackers = new EnumMap<>(ResourceType.class) {
+            {
+                put(JVM, heapUsageTracker);
+                put(CPU, mockNodeDuressTracker);
+            }
+        };
+
         SearchBackpressureService service = new SearchBackpressureService(
             settings,
             mockTaskResourceTrackingService,
             threadPool,
             mockTimeNanosSupplier,
-            List.of(mockNodeDuressTracker),
-            List.of(mockTaskResourceUsageTracker),
-            Collections.emptyList(),
+            new NodeDuressTrackers(duressTrackers),
+            taskResourceUsageTrackers,
+            new TaskResourceUsageTrackers(),
             mockTaskManager
         );
 
@@ -225,9 +264,9 @@ public void testSearchTaskInFlightCancellation() {
         Map<Long, Task> activeSearchTasks = new HashMap<>();
         for (long i = 0; i < 75; i++) {
             if (i % 3 == 0) {
-                activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 500, taskHeapUsageBytes));
+                activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 500, taskHeapUsageBytes, i));
             } else {
-                activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 100, taskHeapUsageBytes));
+                activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 100, taskHeapUsageBytes, i));
             }
         }
         doReturn(activeSearchTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks();
@@ -264,9 +303,28 @@ public void testSearchShardTaskInFlightCancellation() {
         TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class);
         AtomicLong mockTime = new AtomicLong(0);
         LongSupplier mockTimeNanosSupplier = mockTime::get;
-        NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true);
+        NodeDuressTracker mockNodeDuressTracker = new NodeDuressTracker(() -> true, () -> 3);
 
-        TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker();
+        EnumMap<ResourceType, NodeDuressTracker> duressTrackers = new EnumMap<>(ResourceType.class) {
+            {
+                put(JVM, new NodeDuressTracker(() -> false, () -> 3));
+                put(CPU, mockNodeDuressTracker);
+            }
+        };
+        NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers);
+
+        TaskResourceUsageTracker mockTaskResourceUsageTracker = getMockedTaskResourceUsageTracker(
+            TaskResourceUsageTrackerType.CPU_USAGE_TRACKER,
+            (task) -> {
+                if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) {
+                    return Optional.empty();
+                }
+
+                return Optional.of(new TaskCancellation.Reason("limits exceeded", 5));
+            }
+        );
+        TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers();
+        taskResourceUsageTrackers.addTracker(mockTaskResourceUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER);
 
         // Mocking 'settings' with predictable rate limiting thresholds.
         SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 10.0);
@@ -276,9 +334,9 @@ public void testSearchShardTaskInFlightCancellation() {
             mockTaskResourceTrackingService,
             threadPool,
             mockTimeNanosSupplier,
-            List.of(mockNodeDuressTracker),
-            Collections.emptyList(),
-            List.of(mockTaskResourceUsageTracker),
+            nodeDuressTrackers,
+            new TaskResourceUsageTrackers(),
+            taskResourceUsageTrackers,
             mockTaskManager
         );
 
@@ -297,9 +355,9 @@ public void testSearchShardTaskInFlightCancellation() {
         Map<Long, Task> activeSearchShardTasks = new HashMap<>();
         for (long i = 0; i < 75; i++) {
             if (i % 5 == 0) {
-                activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 500, taskHeapUsageBytes));
+                activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 500, taskHeapUsageBytes, i));
             } else {
-                activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes));
+                activeSearchShardTasks.put(i, createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes, i));
             }
         }
         doReturn(activeSearchShardTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks();
@@ -317,7 +375,7 @@ public void testSearchShardTaskInFlightCancellation() {
         // Simulate task completion to replenish some tokens.
         // This will add 2 tokens (task count delta * cancellationRatio) to 'rateLimitPerTaskCompletion'.
         for (int i = 0; i < 20; i++) {
-            service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes));
+            service.onTaskCompleted(createMockTaskWithResourceStats(SearchShardTask.class, 100, taskHeapUsageBytes, i));
         }
         service.doRun();
         verify(mockTaskManager, times(12)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
@@ -333,6 +391,181 @@ public void testSearchShardTaskInFlightCancellation() {
         assertEquals(expectedStats, actualStats);
     }
 
+    public void testNonCancellationOfHeapBasedTasksWhenHeapNotInDuress() {
+        TaskManager mockTaskManager = spy(taskManager);
+        TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class);
+        AtomicLong mockTime = new AtomicLong(0);
+        LongSupplier mockTimeNanosSupplier = mockTime::get;
+
+        EnumMap<ResourceType, NodeDuressTracker> duressTrackers = new EnumMap<>(ResourceType.class) {
+            {
+                put(JVM, new NodeDuressTracker(() -> false, () -> 3));
+                put(CPU, new NodeDuressTracker(() -> true, () -> 3));
+            }
+        };
+
+        NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers);
+
+        // Creating heap and cpu usage trackers where heap tracker will always evaluate with reasons to cancel the
+        // tasks but heap based cancellation should not happen because heap is not in duress
+        TaskResourceUsageTracker heapUsageTracker = getMockedTaskResourceUsageTracker(
+            TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER,
+            (task) -> Optional.of(new TaskCancellation.Reason("mem exceeded", 10))
+        );
+        TaskResourceUsageTracker cpuUsageTracker = getMockedTaskResourceUsageTracker(
+            TaskResourceUsageTrackerType.CPU_USAGE_TRACKER,
+            (task) -> {
+                if (task.getTotalResourceStats().getCpuTimeInNanos() < 400) {
+                    return Optional.empty();
+                }
+                return Optional.of(new TaskCancellation.Reason("cpu time limit exceeded", 5));
+            }
+        );
+
+        TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers();
+        taskResourceUsageTrackers.addTracker(cpuUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER);
+        taskResourceUsageTrackers.addTracker(heapUsageTracker, TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER);
+
+        // Mocking 'settings' with predictable rate limiting thresholds.
+        SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 10.0);
+
+        SearchBackpressureService service = new SearchBackpressureService(
+            settings,
+            mockTaskResourceTrackingService,
+            threadPool,
+            mockTimeNanosSupplier,
+            nodeDuressTrackers,
+            taskResourceUsageTrackers,
+            new TaskResourceUsageTrackers(),
+            mockTaskManager
+        );
+
+        service.doRun();
+        service.doRun();
+
+        SearchTaskSettings searchTaskSettings = mock(SearchTaskSettings.class);
+        // setting the total heap percent threshold to minimum so that circuit does not break in SearchBackpressureService
+        when(searchTaskSettings.getTotalHeapPercentThreshold()).thenReturn(0.0);
+        when(settings.getSearchTaskSettings()).thenReturn(searchTaskSettings);
+
+        // Create a mix of low and high resource usage tasks (60 low + 15 high resource usage tasks).
+        Map<Long, Task> activeSearchTasks = new HashMap<>();
+        for (long i = 0; i < 75; i++) {
+            if (i % 5 == 0) {
+                activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 500, 800, i));
+            } else {
+                activeSearchTasks.put(i, createMockTaskWithResourceStats(SearchTask.class, 100, 800, i));
+            }
+        }
+        doReturn(activeSearchTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks();
+
+        // this will trigger cancellation but these cancellation should only be cpu based
+        service.doRun();
+        verify(mockTaskManager, times(5)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
+        assertEquals(5, service.getSearchBackpressureState(SearchTask.class).getCancellationCount());
+        assertEquals(1, service.getSearchBackpressureState(SearchTask.class).getLimitReachedCount());
+
+        SearchBackpressureStats expectedStats = new SearchBackpressureStats(
+            new SearchTaskStats(
+                5,
+                1,
+                0,
+                Map.of(
+                    TaskResourceUsageTrackerType.CPU_USAGE_TRACKER,
+                    new MockStats(5),
+                    TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER,
+                    new MockStats(0)
+                )
+            ),
+            new SearchShardTaskStats(0, 0, 0, Collections.emptyMap()),
+            SearchBackpressureMode.ENFORCED
+        );
+
+        SearchBackpressureStats actualStats = service.nodeStats();
+        assertEquals(expectedStats, actualStats);
+    }
+
+    public void testNonCancellationWhenSearchTrafficIsNotQualifyingForCancellation() {
+        TaskManager mockTaskManager = spy(taskManager);
+        TaskResourceTrackingService mockTaskResourceTrackingService = mock(TaskResourceTrackingService.class);
+        AtomicLong mockTime = new AtomicLong(0);
+        LongSupplier mockTimeNanosSupplier = mockTime::get;
+
+        EnumMap<ResourceType, NodeDuressTracker> duressTrackers = new EnumMap<>(ResourceType.class) {
+            {
+                put(JVM, new NodeDuressTracker(() -> false, () -> 3));
+                put(CPU, new NodeDuressTracker(() -> true, () -> 3));
+            }
+        };
+
+        NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(duressTrackers);
+
+        // Creating heap and cpu usage trackers where heap tracker will always evaluate with reasons to cancel the
+        // tasks but heap based cancellation should not happen because heap is not in duress
+        TaskResourceUsageTracker heapUsageTracker = getMockedTaskResourceUsageTracker(
+            TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER,
+            (task) -> Optional.of(new TaskCancellation.Reason("mem exceeded", 10))
+        );
+        TaskResourceUsageTracker cpuUsageTracker = getMockedTaskResourceUsageTracker(
+            TaskResourceUsageTrackerType.CPU_USAGE_TRACKER,
+            (task) -> {
+                if (task.getTotalResourceStats().getCpuTimeInNanos() < 400) {
+                    return Optional.empty();
+                }
+                return Optional.of(new TaskCancellation.Reason("cpu time limit exceeded", 5));
+            }
+        );
+
+        TaskResourceUsageTrackers taskResourceUsageTrackers = new TaskResourceUsageTrackers();
+        taskResourceUsageTrackers.addTracker(cpuUsageTracker, TaskResourceUsageTrackerType.CPU_USAGE_TRACKER);
+        taskResourceUsageTrackers.addTracker(heapUsageTracker, TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER);
+
+        // Mocking 'settings' with predictable rate limiting thresholds.
+        SearchBackpressureSettings settings = getBackpressureSettings("enforced", 0.1, 0.003, 10.0);
+
+        SearchBackpressureService service = spy(
+            new SearchBackpressureService(
+                settings,
+                mockTaskResourceTrackingService,
+                threadPool,
+                mockTimeNanosSupplier,
+                nodeDuressTrackers,
+                taskResourceUsageTrackers,
+                new TaskResourceUsageTrackers(),
+                mockTaskManager
+            )
+        );
+
+        when(service.isHeapUsageDominatedBySearch(anyList(), anyDouble())).thenReturn(false);
+
+        service.doRun();
+        service.doRun();
+
+        SearchTaskSettings searchTaskSettings = mock(SearchTaskSettings.class);
+        // setting the total heap percent threshold to minimum so that circuit does not break in SearchBackpressureService
+        when(searchTaskSettings.getTotalHeapPercentThreshold()).thenReturn(0.0);
+        when(settings.getSearchTaskSettings()).thenReturn(searchTaskSettings);
+
+        // Create a mix of low and high resource usage tasks (60 low + 15 high resource usage tasks).
+        Map<Long, Task> activeSearchTasks = new HashMap<>();
+        for (long i = 0; i < 75; i++) {
+            Class<? extends CancellableTask> taskType = randomBoolean() ? SearchTask.class : SearchShardTask.class;
+            if (i % 5 == 0) {
+                activeSearchTasks.put(i, createMockTaskWithResourceStats(taskType, 500, 800, i));
+            } else {
+                activeSearchTasks.put(i, createMockTaskWithResourceStats(taskType, 100, 800, i));
+            }
+        }
+        doReturn(activeSearchTasks).when(mockTaskResourceTrackingService).getResourceAwareTasks();
+
+        // this will trigger cancellation but the cancellation should not happen as the node is not is duress because of search traffic
+        service.doRun();
+
+        verify(mockTaskManager, times(0)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
+        assertEquals(0, service.getSearchBackpressureState(SearchTask.class).getCancellationCount());
+        assertEquals(0, service.getSearchBackpressureState(SearchShardTask.class).getCancellationCount());
+    }
+
     private SearchBackpressureSettings getBackpressureSettings(String mode, double ratio, double rate, double burst) {
         return spy(
             new SearchBackpressureSettings(
@@ -342,11 +575,14 @@ private SearchBackpressureSettings getBackpressureSettings(String mode, double r
         );
     }
 
-    private TaskResourceUsageTracker getMockedTaskResourceUsageTracker() {
+    private TaskResourceUsageTracker getMockedTaskResourceUsageTracker(
+        TaskResourceUsageTrackerType type,
+        TaskResourceUsageTracker.ResourceUsageBreachEvaluator evaluator
+    ) {
         return new TaskResourceUsageTracker() {
             @Override
             public String name() {
-                return TaskResourceUsageTrackerType.CPU_USAGE_TRACKER.getName();
+                return type.getName();
             }
 
             @Override
@@ -354,11 +590,7 @@ public void update(Task task) {}
 
             @Override
             public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
-                if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) {
-                    return Optional.empty();
-                }
-
-                return Optional.of(new TaskCancellation.Reason("limits exceeded", 5));
+                return evaluator.evaluate(task);
             }
 
             @Override
diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java
index 6478fdfff61d4..89a743efa09d8 100644
--- a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java
+++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchShardTaskStatsTests.java
@@ -12,8 +12,8 @@
 import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
 import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
 import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
-import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
 import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
 import org.opensearch.test.AbstractWireSerializingTestCase;
 
 import java.util.Map;
diff --git a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java
index eb33bc1c37b7e..7117e6a33a5af 100644
--- a/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java
+++ b/server/src/test/java/org/opensearch/search/backpressure/stats/SearchTaskStatsTests.java
@@ -12,8 +12,8 @@
 import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
 import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
 import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
-import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
 import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
 import org.opensearch.test.AbstractWireSerializingTestCase;
 
 import java.util.Map;
diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java
index 8cdcbc7511bd2..0117b0ed71c27 100644
--- a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java
+++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java
@@ -33,7 +33,7 @@ public class CpuUsageTrackerTests extends OpenSearchTestCase {
     );
 
     public void testSearchTaskEligibleForCancellation() {
-        Task task = createMockTaskWithResourceStats(SearchTask.class, 100000000, 200);
+        Task task = createMockTaskWithResourceStats(SearchTask.class, 100000000, 200, randomNonNegativeLong());
         CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchTaskSettings()::getCpuTimeNanosThreshold);
 
         Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
@@ -43,7 +43,7 @@ public void testSearchTaskEligibleForCancellation() {
     }
 
     public void testSearchShardTaskEligibleForCancellation() {
-        Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200);
+        Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200, randomNonNegativeLong());
         CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchShardTaskSettings()::getCpuTimeNanosThreshold);
 
         Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
@@ -53,7 +53,7 @@ public void testSearchShardTaskEligibleForCancellation() {
     }
 
     public void testNotEligibleForCancellation() {
-        Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200);
+        Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200, randomNonNegativeLong());
         CpuUsageTracker tracker = new CpuUsageTracker(mockSettings.getSearchShardTaskSettings()::getCpuTimeNanosThreshold);
 
         Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java
index 921d01e7355a7..514f1b4785aa1 100644
--- a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java
+++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java
@@ -47,7 +47,7 @@ public void testSearchTaskEligibleForCancellation() {
     }
 
     public void testSearchShardTaskEligibleForCancellation() {
-        Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0);
+        Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0, randomNonNegativeLong());
         ElapsedTimeTracker tracker = new ElapsedTimeTracker(
             mockSettings.getSearchShardTaskSettings()::getElapsedTimeNanosThreshold,
             () -> 200000000
@@ -60,7 +60,7 @@ public void testSearchShardTaskEligibleForCancellation() {
     }
 
     public void testNotEligibleForCancellation() {
-        Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000);
+        Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000, randomNonNegativeLong());
         ElapsedTimeTracker tracker = new ElapsedTimeTracker(
             mockSettings.getSearchShardTaskSettings()::getElapsedTimeNanosThreshold,
             () -> 200000000
diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java
index 3950d00b0c8b5..1c46305e9fda6 100644
--- a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java
+++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java
@@ -58,7 +58,7 @@ public void testSearchTaskEligibleForCancellation() {
                 SearchTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE
             )
         );
-        Task task = createMockTaskWithResourceStats(SearchTask.class, 1, 50);
+        Task task = createMockTaskWithResourceStats(SearchTask.class, 1, 50, randomNonNegativeLong());
 
         // Record enough observations to make the moving average 'ready'.
         for (int i = 0; i < HEAP_MOVING_AVERAGE_WINDOW_SIZE; i++) {
@@ -66,7 +66,7 @@ public void testSearchTaskEligibleForCancellation() {
         }
 
         // Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance).
-        task = createMockTaskWithResourceStats(SearchTask.class, 1, 300);
+        task = createMockTaskWithResourceStats(SearchTask.class, 1, 300, randomNonNegativeLong());
         Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
         assertTrue(reason.isPresent());
         assertEquals(6, reason.get().getCancellationScore());
@@ -88,7 +88,7 @@ public void testSearchShardTaskEligibleForCancellation() {
                 SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE
             )
         );
-        Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 50);
+        Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 50, randomNonNegativeLong());
 
         // Record enough observations to make the moving average 'ready'.
         for (int i = 0; i < HEAP_MOVING_AVERAGE_WINDOW_SIZE; i++) {
@@ -96,7 +96,7 @@ public void testSearchShardTaskEligibleForCancellation() {
         }
 
         // Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance).
-        task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200);
+        task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200, randomNonNegativeLong());
         Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
         assertTrue(reason.isPresent());
         assertEquals(4, reason.get().getCancellationScore());
@@ -122,7 +122,7 @@ public void testNotEligibleForCancellation() {
         );
 
         // Task with heap usage < heapBytesThreshold.
-        task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99);
+        task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99, randomNonNegativeLong());
 
         // Not enough observations.
         reason = tracker.checkAndMaybeGetCancellationReason(task);
@@ -139,7 +139,12 @@ public void testNotEligibleForCancellation() {
 
         // Task with heap usage between heapBytesThreshold and (movingAverage * heapVariance) should not be cancelled.
         double allowedHeapUsage = 99.0 * 2.0;
-        task = createMockTaskWithResourceStats(SearchShardTask.class, 1, randomLongBetween(99, (long) allowedHeapUsage - 1));
+        task = createMockTaskWithResourceStats(
+            SearchShardTask.class,
+            1,
+            randomLongBetween(99, (long) allowedHeapUsage - 1),
+            randomNonNegativeLong()
+        );
         reason = tracker.checkAndMaybeGetCancellationReason(task);
         assertFalse(reason.isPresent());
     }
@@ -148,12 +153,12 @@ public void testIsHeapUsageDominatedBySearch() {
         assumeTrue("Skip the test if the hardware doesn't support heap usage tracking", HeapUsageTracker.isHeapTrackingSupported());
 
         // task with 1 byte of heap usage so that it does not breach the threshold
-        CancellableTask task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1);
+        CancellableTask task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, randomNonNegativeLong());
         assertFalse(HeapUsageTracker.isHeapUsageDominatedBySearch(List.of(task), 0.5));
 
         long totalHeap = JvmStats.jvmStats().getMem().getHeapMax().getBytes();
         // task with heap usage of [totalHeap - 1] so that it breaches the threshold
-        task = createMockTaskWithResourceStats(SearchShardTask.class, 1, totalHeap - 1);
+        task = createMockTaskWithResourceStats(SearchShardTask.class, 1, totalHeap - 1, randomNonNegativeLong());
         assertTrue(HeapUsageTracker.isHeapUsageDominatedBySearch(List.of(task), 0.5));
     }
 }
diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java
index 472ba95566523..32aca6ac3230e 100644
--- a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java
+++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackerTests.java
@@ -8,6 +8,7 @@
 
 package org.opensearch.search.backpressure.trackers;
 
+import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker;
 import org.opensearch.test.OpenSearchTestCase;
 
 import java.util.concurrent.atomic.AtomicReference;
@@ -16,20 +17,20 @@ public class NodeDuressTrackerTests extends OpenSearchTestCase {
 
     public void testNodeDuressTracker() {
         AtomicReference<Double> cpuUsage = new AtomicReference<>(0.0);
-        NodeDuressTracker tracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5);
+        NodeDuressTracker tracker = new NodeDuressTracker(() -> cpuUsage.get() >= 0.5, () -> 3);
 
         // Node not in duress.
-        assertEquals(0, tracker.check());
+        assertFalse(tracker.test());
 
         // Node in duress; the streak must keep increasing.
         cpuUsage.set(0.7);
-        assertEquals(1, tracker.check());
-        assertEquals(2, tracker.check());
-        assertEquals(3, tracker.check());
+        assertFalse(tracker.test());
+        assertFalse(tracker.test());
+        assertTrue(tracker.test());
 
         // Node not in duress anymore.
         cpuUsage.set(0.3);
-        assertEquals(0, tracker.check());
-        assertEquals(0, tracker.check());
+        assertFalse(tracker.test());
+        assertFalse(tracker.test());
     }
 }
diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java
new file mode 100644
index 0000000000000..2db251ee461db
--- /dev/null
+++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/NodeDuressTrackersTests.java
@@ -0,0 +1,84 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.search.backpressure.trackers;
+
+import org.opensearch.search.ResourceType;
+import org.opensearch.search.backpressure.trackers.NodeDuressTrackers.NodeDuressTracker;
+import org.opensearch.test.OpenSearchTestCase;
+
+import java.util.EnumMap;
+
+public class NodeDuressTrackersTests extends OpenSearchTestCase {
+
+    public void testNodeNotInDuress() {
+        EnumMap<ResourceType, NodeDuressTracker> map = new EnumMap<>(ResourceType.class) {
+            {
+                put(ResourceType.JVM, new NodeDuressTracker(() -> false, () -> 2));
+                put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 2));
+            }
+        };
+
+        NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map);
+
+        assertFalse(nodeDuressTrackers.isNodeInDuress());
+        assertFalse(nodeDuressTrackers.isNodeInDuress());
+        assertFalse(nodeDuressTrackers.isNodeInDuress());
+    }
+
+    public void testNodeInDuressWhenHeapInDuress() {
+        EnumMap<ResourceType, NodeDuressTracker> map = new EnumMap<>(ResourceType.class) {
+            {
+                put(ResourceType.JVM, new NodeDuressTracker(() -> true, () -> 3));
+                put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 1));
+            }
+        };
+
+        NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map);
+
+        assertFalse(nodeDuressTrackers.isNodeInDuress());
+        assertFalse(nodeDuressTrackers.isNodeInDuress());
+
+        // for the third time it should be in duress
+        assertTrue(nodeDuressTrackers.isNodeInDuress());
+    }
+
+    public void testNodeInDuressWhenCPUInDuress() {
+        EnumMap<ResourceType, NodeDuressTracker> map = new EnumMap<>(ResourceType.class) {
+            {
+                put(ResourceType.JVM, new NodeDuressTracker(() -> false, () -> 1));
+                put(ResourceType.CPU, new NodeDuressTracker(() -> true, () -> 3));
+            }
+        };
+
+        NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map);
+
+        assertFalse(nodeDuressTrackers.isNodeInDuress());
+        assertFalse(nodeDuressTrackers.isNodeInDuress());
+
+        // for the third time it should be in duress
+        assertTrue(nodeDuressTrackers.isNodeInDuress());
+    }
+
+    public void testNodeInDuressWhenCPUAndHeapInDuress() {
+        EnumMap<ResourceType, NodeDuressTracker> map = new EnumMap<>(ResourceType.class) {
+            {
+                put(ResourceType.JVM, new NodeDuressTracker(() -> true, () -> 3));
+                put(ResourceType.CPU, new NodeDuressTracker(() -> false, () -> 3));
+            }
+        };
+
+        NodeDuressTrackers nodeDuressTrackers = new NodeDuressTrackers(map);
+
+        assertFalse(nodeDuressTrackers.isNodeInDuress());
+        assertFalse(nodeDuressTrackers.isNodeInDuress());
+
+        // for the third time it should be in duress
+        assertTrue(nodeDuressTrackers.isNodeInDuress());
+    }
+}
diff --git a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java
index e74f89c905499..f08c12ea258ca 100644
--- a/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java
+++ b/server/src/test/java/org/opensearch/tasks/TaskCancellationTests.java
@@ -9,7 +9,7 @@
 package org.opensearch.tasks;
 
 import org.opensearch.action.search.SearchShardTask;
-import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
+import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
 import org.opensearch.test.OpenSearchTestCase;
 
 import java.util.ArrayList;
@@ -69,7 +69,7 @@ public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task
             }
 
             @Override
-            public Stats stats(List<? extends Task> activeTasks) {
+            public TaskResourceUsageTracker.Stats stats(List<? extends Task> activeTasks) {
                 return null;
             }
         };
diff --git a/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java b/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java
index af06b1688dca2..8f31f2a60ea86 100644
--- a/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java
+++ b/test/framework/src/main/java/org/opensearch/search/backpressure/SearchBackpressureTestHelpers.java
@@ -21,19 +21,21 @@
 
 public class SearchBackpressureTestHelpers extends OpenSearchTestCase {
 
-    public static <T extends CancellableTask> T createMockTaskWithResourceStats(Class<T> type, long cpuUsage, long heapUsage) {
-        return createMockTaskWithResourceStats(type, cpuUsage, heapUsage, 0);
+    public static <T extends CancellableTask> T createMockTaskWithResourceStats(Class<T> type, long cpuUsage, long heapUsage, long taskId) {
+        return createMockTaskWithResourceStats(type, cpuUsage, heapUsage, 0, taskId);
     }
 
     public static <T extends CancellableTask> T createMockTaskWithResourceStats(
         Class<T> type,
         long cpuUsage,
         long heapUsage,
-        long startTimeNanos
+        long startTimeNanos,
+        long taskId
     ) {
         T task = mock(type);
         when(task.getTotalResourceStats()).thenReturn(new TaskResourceUsage(cpuUsage, heapUsage));
         when(task.getStartTimeNanos()).thenReturn(startTimeNanos);
+        when(task.getId()).thenReturn(randomNonNegativeLong());
 
         AtomicBoolean isCancelled = new AtomicBoolean(false);
         doAnswer(invocation -> {