Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kaushal Kumar <[email protected]>
  • Loading branch information
kaushalmahi12 committed Sep 9, 2024
1 parent 34184ef commit 91893e7
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 138 deletions.
27 changes: 8 additions & 19 deletions server/src/main/java/org/opensearch/wlm/ResourceType.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@

package org.opensearch.wlm;

import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.wlm.tracker.CpuUsageCalculator;
import org.opensearch.wlm.tracker.MemoryUsageCalculator;
import org.opensearch.wlm.tracker.QueryGroupUsageHelper;
import org.opensearch.wlm.tracker.ResourceUsageCalculator;

import java.io.IOException;
import java.util.List;
import java.util.function.Function;

/**
* Enum to hold the resource type
Expand All @@ -26,35 +25,25 @@
*/
@PublicApi(since = "2.17.0")
public enum ResourceType {
CPU("cpu", true, CpuUsageCalculator.INSTANCE, new QueryGroupUsageHelper() {
@Override
protected double getNormalisedThreshold(QueryGroup queryGroup) {
return queryGroup.getResourceLimits().get(ResourceType.CPU) * getSettings().getNodeLevelCpuCancellationThreshold();
}
}),
MEMORY("memory", true, MemoryUsageCalculator.INSTANCE, new QueryGroupUsageHelper() {
@Override
protected double getNormalisedThreshold(QueryGroup queryGroup) {
return queryGroup.getResourceLimits().get(ResourceType.MEMORY) * getSettings().getNodeLevelMemoryCancellationThreshold();
}
});
CPU("cpu", true, CpuUsageCalculator.INSTANCE, WorkloadManagementSettings::getNodeLevelCpuCancellationThreshold),
MEMORY("memory", true, MemoryUsageCalculator.INSTANCE, WorkloadManagementSettings::getNodeLevelMemoryCancellationThreshold);

private final String name;
private final boolean statsEnabled;
private final ResourceUsageCalculator resourceUsageCalculator;
private final QueryGroupUsageHelper queryGroupUsageHelper;
private final Function<WorkloadManagementSettings, Double> nodeLevelThresholdSupplier;
private static List<ResourceType> sortedValues = List.of(CPU, MEMORY);

ResourceType(
String name,
boolean statsEnabled,
ResourceUsageCalculator resourceUsageCalculator,
QueryGroupUsageHelper queryGroupUsageHelper
Function<WorkloadManagementSettings, Double> nodeLevelThresholdSupplier
) {
this.name = name;
this.statsEnabled = statsEnabled;
this.resourceUsageCalculator = resourceUsageCalculator;
this.queryGroupUsageHelper = queryGroupUsageHelper;
this.nodeLevelThresholdSupplier = nodeLevelThresholdSupplier;
}

/**
Expand Down Expand Up @@ -87,8 +76,8 @@ public ResourceUsageCalculator getResourceUsageCalculator() {
return resourceUsageCalculator;
}

public QueryGroupUsageHelper getQueryGroupUsage() {
return queryGroupUsageHelper;
public double getNodeLevelThreshold(WorkloadManagementSettings settings) {
return nodeLevelThresholdSupplier.apply(settings);
}

public static List<ResourceType> getSortedValues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.wlm.cancellation.TaskCanceller.MIN_VALUE;
Expand All @@ -25,15 +24,7 @@
*/
public class HighestResourceConsumingTaskFirstSelectionStrategy implements TaskSelectionStrategy {

private final Supplier<Long> nanoTimeSupplier;

public HighestResourceConsumingTaskFirstSelectionStrategy() {
this(System::nanoTime);
}

public HighestResourceConsumingTaskFirstSelectionStrategy(Supplier<Long> nanoTimeSupplier) {
this.nanoTimeSupplier = nanoTimeSupplier;
}
public HighestResourceConsumingTaskFirstSelectionStrategy() {}

/**
* Returns a comparator that defines the sorting condition for tasks.
Expand All @@ -43,9 +34,7 @@ public HighestResourceConsumingTaskFirstSelectionStrategy(Supplier<Long> nanoTim
* @return The comparator
*/
private Comparator<QueryGroupTask> sortingCondition(ResourceType resourceType) {
return Comparator.comparingDouble(
task -> resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task, nanoTimeSupplier)
);
return Comparator.comparingDouble(task -> resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task));
}

/**
Expand All @@ -72,7 +61,7 @@ public List<QueryGroupTask> selectTasksForCancellation(List<QueryGroupTask> task
double accumulated = 0;
for (QueryGroupTask task : sortedTasks) {
selectedTasks.add(task);
accumulated += resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task, nanoTimeSupplier);
accumulated += resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task);
if ((accumulated - limit) > MIN_VALUE) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.wlm.QueryGroupTask;
import org.opensearch.wlm.ResourceType;
import org.opensearch.wlm.WorkloadManagementSettings;
import org.opensearch.wlm.tracker.QueryGroupUsageHelper;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -70,7 +69,6 @@ public TaskCanceller(
this.activeQueryGroups = activeQueryGroups;
this.deletedQueryGroups = deletedQueryGroups;
this.isNodeInDuress = isNodeInDuress;
TRACKED_RESOURCES.forEach(resourceType -> resourceType.getQueryGroupUsage().setSettings(workloadManagementSettings));
}

/**
Expand Down Expand Up @@ -132,13 +130,9 @@ private List<QueryGroup> getQueryGroupsToCancelFrom(ResiliencyMode resiliencyMod
if (queryGroup.getResiliencyMode() != resiliencyMode) {
continue;
}
Map<ResourceType, Double> queryGroupResourcesUsage = queryGroupLevelResourceUsageViews.get(queryGroup.get_id())
.getResourceUsageData();

for (ResourceType resourceType : TRACKED_RESOURCES) {
if (queryGroup.getResourceLimits().containsKey(resourceType)) {
final double currentUsage = queryGroupResourcesUsage.get(resourceType);
if (resourceType.getQueryGroupUsage().isBreachingThresholdFor(queryGroup, currentUsage)) {
if (shouldCancelTasks(queryGroup, resourceType)) {
queryGroupsToCancelFrom.add(queryGroup);
break;
}
Expand Down Expand Up @@ -231,8 +225,17 @@ private double getExcessUsage(QueryGroup queryGroup, ResourceType resourceType)

final QueryGroupLevelResourceUsageView queryGroupResourceUsageView = queryGroupLevelResourceUsageViews.get(queryGroup.get_id());
final double currentUsage = queryGroupResourceUsageView.getResourceUsageData().get(resourceType);
QueryGroupUsageHelper queryGroupUsageHelper = resourceType.getQueryGroupUsage();
return queryGroupUsageHelper.getExcessUsage(queryGroup, currentUsage);
return currentUsage - getNormalisedThreshold(queryGroup, resourceType);
}

/**
* normalises configured value with respect to node level cancellation thresholds
* @param queryGroup instance
* @return normalised value with respect to node level cancellation thresholds
*/
private double getNormalisedThreshold(QueryGroup queryGroup, ResourceType resourceType) {
double nodeLevelCancellationThreshold = resourceType.getNodeLevelThreshold(workloadManagementSettings);
return queryGroup.getResourceLimits().get(resourceType) * nodeLevelCancellationThreshold;
}

private void callbackOnCancel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,33 @@
import org.opensearch.wlm.QueryGroupTask;

import java.util.List;
import java.util.function.Supplier;
import java.util.function.LongSupplier;

/**
* class to help make cpu usage calculations for the query group
*/
public class CpuUsageCalculator implements ResourceUsageCalculator {
public class CpuUsageCalculator extends ResourceUsageCalculator {
// This value should be initialised at the start time of the process and be used throughout the codebase
public static final int PROCESSOR_COUNT = Runtime.getRuntime().availableProcessors();
public static final CpuUsageCalculator INSTANCE = new CpuUsageCalculator();
private LongSupplier nanoTimeSupplier;

private CpuUsageCalculator() {}

public void setNanoTimeSupplier(LongSupplier nanoTimeSupplier) {
this.nanoTimeSupplier = nanoTimeSupplier;
}

@Override
public double calculateResourceUsage(List<QueryGroupTask> tasks, Supplier<Long> timeSupplier) {
double usage = tasks.stream().mapToDouble(task -> calculateTaskResourceUsage(task, timeSupplier)).sum();
public double calculateResourceUsage(List<QueryGroupTask> tasks) {
double usage = tasks.stream().mapToDouble(this::calculateTaskResourceUsage).sum();

usage /= PROCESSOR_COUNT;
return usage;
}

@Override
public double calculateTaskResourceUsage(QueryGroupTask task, Supplier<Long> nanoTimeSupplier) {
return (1.0f * task.getTotalResourceUtilization(ResourceStats.CPU)) / (nanoTimeSupplier.get() - task.getStartTimeNanos());
public double calculateTaskResourceUsage(QueryGroupTask task) {
return (1.0f * task.getTotalResourceUtilization(ResourceStats.CPU)) / (nanoTimeSupplier.getAsLong() - task.getStartTimeNanos());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,23 @@
import org.opensearch.wlm.QueryGroupTask;

import java.util.List;
import java.util.function.Supplier;

/**
* class to help make memory usage calculations for the query group
*/
public class MemoryUsageCalculator implements ResourceUsageCalculator {
public class MemoryUsageCalculator extends ResourceUsageCalculator {
public static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes();
public static final MemoryUsageCalculator INSTANCE = new MemoryUsageCalculator();

private MemoryUsageCalculator() {}

@Override
public double calculateResourceUsage(List<QueryGroupTask> tasks, Supplier<Long> timeSupplier) {
return tasks.stream().mapToDouble(task -> calculateTaskResourceUsage(task, timeSupplier)).sum();
public double calculateResourceUsage(List<QueryGroupTask> tasks) {
return tasks.stream().mapToDouble(this::calculateTaskResourceUsage).sum();
}

@Override
public double calculateTaskResourceUsage(QueryGroupTask task, Supplier<Long> timeSupplier) {
public double calculateTaskResourceUsage(QueryGroupTask task) {
return (1.0f * task.getTotalResourceUtilization(ResourceStats.MEMORY)) / HEAP_SIZE_BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/**
Expand All @@ -26,16 +26,15 @@
public class QueryGroupResourceUsageTrackerService {
public static final EnumSet<ResourceType> TRACKED_RESOURCES = EnumSet.allOf(ResourceType.class);
private final TaskResourceTrackingService taskResourceTrackingService;
private final Supplier<Long> nanoTimeSupplier;

/**
* QueryGroupResourceTrackerService constructor
*
* @param taskResourceTrackingService Service that helps track resource usage of tasks running on a node.
*/
public QueryGroupResourceUsageTrackerService(TaskResourceTrackingService taskResourceTrackingService, Supplier<Long> nanoTimeSupplier) {
public QueryGroupResourceUsageTrackerService(TaskResourceTrackingService taskResourceTrackingService, LongSupplier nanoTimeSupplier) {
this.taskResourceTrackingService = taskResourceTrackingService;
this.nanoTimeSupplier = nanoTimeSupplier;
ResourceType.CPU.getResourceUsageCalculator().setNanoTimeSupplier(nanoTimeSupplier);
}

/**
Expand All @@ -52,8 +51,7 @@ public Map<String, QueryGroupLevelResourceUsageView> constructQueryGroupLevelUsa
// Compute the QueryGroup resource usage
final Map<ResourceType, Double> resourceUsage = new HashMap<>();
for (ResourceType resourceType : TRACKED_RESOURCES) {
double usage = resourceType.getResourceUsageCalculator()
.calculateResourceUsage(queryGroupEntry.getValue(), nanoTimeSupplier);
double usage = resourceType.getResourceUsageCalculator().calculateResourceUsage(queryGroupEntry.getValue());
resourceUsage.put(resourceType, usage);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,32 @@
import org.opensearch.wlm.QueryGroupTask;

import java.util.List;
import java.util.function.Supplier;
import java.util.function.LongSupplier;

/**
* This class is used to track query group level resource usage
*/
@PublicApi(since = "2.18.0")
public interface ResourceUsageCalculator {
public abstract class ResourceUsageCalculator {
/**
* calculates the current resource usage for the query group
*
* @param tasks list of tasks in the query group
* @param timeSupplier nano time supplier
*/
double calculateResourceUsage(List<QueryGroupTask> tasks, Supplier<Long> timeSupplier);
public abstract double calculateResourceUsage(List<QueryGroupTask> tasks);

/**
* calculates the task level resource usage
* @param task QueryGroupTask
* @param timeSupplier in nano seconds unit
* @return task level resource usage
*/
double calculateTaskResourceUsage(QueryGroupTask task, Supplier<Long> timeSupplier);
public abstract double calculateTaskResourceUsage(QueryGroupTask task);

/**
* Since only few implementations might need this
* @param nanoTimeSupplier
*/
public void setNanoTimeSupplier(LongSupplier nanoTimeSupplier) {

}
}
Loading

0 comments on commit 91893e7

Please sign in to comment.