Skip to content

Commit

Permalink
Caching avg total bytes and avg free bytes inside ClusterInfo
Browse files Browse the repository at this point in the history
Signed-off-by: RS146BIJAY <[email protected]>
  • Loading branch information
RS146BIJAY committed Jul 22, 2024
1 parent b980b12 commit c2caeb4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
38 changes: 38 additions & 0 deletions server/src/main/java/org/opensearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.cluster;

import org.opensearch.Version;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -68,9 +69,12 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
final Map<ShardRouting, String> routingToDataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, FileCacheStats> nodeFileCacheStats;
private long avgTotalBytes;
private long avgFreeByte;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}

/**
Expand All @@ -97,6 +101,7 @@ public ClusterInfo(
this.routingToDataPath = routingToDataPath;
this.reservedSpace = reservedSpace;
this.nodeFileCacheStats = nodeFileCacheStats;
calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -117,6 +122,39 @@ public ClusterInfo(StreamInput in) throws IOException {
} else {
this.nodeFileCacheStats = Map.of();
}

calculateAvgFreeAndTotalBytes(mostAvailableSpaceUsage);
}

/**
* Returns a {@link DiskUsage} for the {@link RoutingNode} using the
* average usage of other nodes in the disk usage map.
* @param usages Map of nodeId to DiskUsage for all known nodes
*/
private void calculateAvgFreeAndTotalBytes(final Map<String, DiskUsage> usages) {
if (usages == null || usages.isEmpty()) {
this.avgTotalBytes = 0;
this.avgFreeByte = 0;
return;
}

long totalBytes = 0;
long freeBytes = 0;
for (DiskUsage du : usages.values()) {
totalBytes += du.getTotalBytes();
freeBytes += du.getFreeBytes();
}

this.avgTotalBytes = totalBytes / usages.size();
this.avgFreeByte = freeBytes / usages.size();
}

public long getAvgFreeByte() {
return avgFreeByte;
}

public long getAvgTotalBytes() {
return avgTotalBytes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ public static long sizeOfRelocatingShards(

// Where reserved space is unavailable (e.g. stats are out-of-sync) compute a conservative estimate for initialising shards
final List<ShardRouting> initializingShards = node.shardsWithState(ShardRoutingState.INITIALIZING);
initializingShards.removeIf(shardRouting -> reservedSpace.containsShardId(shardRouting.shardId()));
for (ShardRouting routing : initializingShards) {
if (routing.relocatingNodeId() == null) {
if (routing.relocatingNodeId() == null || reservedSpace.containsShardId(routing.shardId())) {
// in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created
// by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking
// any additional space and can be ignored here
Expand Down Expand Up @@ -230,7 +229,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

// subtractLeavingShards is passed as false here, because they still use disk space, and therefore we should be extra careful
// and take the size into account
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, false);
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, clusterInfo.getAvgFreeByte(), clusterInfo.getAvgTotalBytes(), false);
// First, check that the node currently over the low watermark
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
// Cache the used disk percentage for displaying disk percentages consistent with documentation
Expand Down Expand Up @@ -492,7 +491,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl

// subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
// since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, true);
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, clusterInfo.getAvgFreeByte(), clusterInfo.getAvgTotalBytes(), false);
final String dataPath = clusterInfo.getDataPath(shardRouting);
// If this node is already above the high threshold, the shard cannot remain (get it off!)
final double freeDiskPercentage = usage.getFreeDiskAsPercentage();
Expand Down Expand Up @@ -581,13 +580,15 @@ private DiskUsageWithRelocations getDiskUsage(
RoutingNode node,
RoutingAllocation allocation,
final Map<String, DiskUsage> usages,
final long avgFreeBytes,
final long avgTotalBytes,
boolean subtractLeavingShards
) {
DiskUsage usage = usages.get(node.nodeId());
if (usage == null) {
// If there is no usage, and we have other nodes in the cluster,
// use the average usage for all nodes as the usage for this node
usage = averageUsage(node, usages);
usage = new DiskUsage(node.nodeId(), node.node().getName(), "_na_", avgTotalBytes, avgFreeBytes);
if (logger.isDebugEnabled()) {
logger.debug(
"unable to determine disk usage for {}, defaulting to average across nodes [{} total] [{} free] [{}% free]",
Expand Down

0 comments on commit c2caeb4

Please sign in to comment.