From ec0ffc3e03ce48057a955b6e97c3c0dc2b103a45 Mon Sep 17 00:00:00 2001 From: Swetha Guptha Date: Wed, 28 Aug 2024 10:57:45 +0530 Subject: [PATCH] Store indices and shards health based on cluster health level param Signed-off-by: Swetha Guptha --- .../cluster/health/ClusterHealthResponse.java | 45 ++++++++++++ .../health/TransportClusterHealthAction.java | 3 + .../stats/TransportClusterStatsAction.java | 3 +- .../cluster/health/ClusterIndexHealth.java | 52 ++++++++++++++ .../cluster/health/ClusterStateHealth.java | 69 +++++++++++++++++++ .../routing/allocation/AllocationService.java | 3 +- 6 files changed, 173 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java index 1a27f161343e8..07cca31f617b0 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java @@ -237,6 +237,49 @@ public ClusterHealthResponse( this.clusterHealthStatus = clusterStateHealth.getStatus(); } + public ClusterHealthResponse( + String clusterName, + String[] concreteIndices, + ClusterHealthRequest.Level healthLevel, + ClusterState clusterState, + int numberOfPendingTasks, + int numberOfInFlightFetch, + int delayedUnassignedShards, + TimeValue taskMaxWaitingTime + ) { + this.clusterName = clusterName; + this.numberOfPendingTasks = numberOfPendingTasks; + this.numberOfInFlightFetch = numberOfInFlightFetch; + this.delayedUnassignedShards = delayedUnassignedShards; + this.taskMaxWaitingTime = taskMaxWaitingTime; + this.clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices, healthLevel); + this.clusterHealthStatus = clusterStateHealth.getStatus(); + } + + // Awareness Attribute health + public ClusterHealthResponse( + String clusterName, + ClusterState clusterState, + ClusterSettings clusterSettings, + String[] concreteIndices, + String awarenessAttributeName, + int numberOfPendingTasks, + int numberOfInFlightFetch, + int delayedUnassignedShards, + TimeValue taskMaxWaitingTime + ) { + this( + clusterName, + concreteIndices, + clusterState, + numberOfPendingTasks, + numberOfInFlightFetch, + delayedUnassignedShards, + taskMaxWaitingTime + ); + this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName); + } + // Awareness Attribute health public ClusterHealthResponse( String clusterName, @@ -244,6 +287,7 @@ public ClusterHealthResponse( ClusterSettings clusterSettings, String[] concreteIndices, String awarenessAttributeName, + ClusterHealthRequest.Level healthLevel, int numberOfPendingTasks, int numberOfInFlightFetch, int delayedUnassignedShards, @@ -252,6 +296,7 @@ public ClusterHealthResponse( this( clusterName, concreteIndices, + healthLevel, clusterState, numberOfPendingTasks, numberOfInFlightFetch, diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java index f69f462372888..2a1d98b3168fc 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -500,6 +500,7 @@ private ClusterHealthResponse clusterHealth( clusterService.getClusterSettings(), concreteIndices, awarenessAttribute, + request.level(), numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), @@ -514,6 +515,7 @@ private ClusterHealthResponse clusterHealth( ClusterHealthResponse response = new ClusterHealthResponse( clusterState.getClusterName().value(), Strings.EMPTY_ARRAY, + request.level(), clusterState, numberOfPendingTasks, numberOfInFlightFetch, @@ -527,6 +529,7 @@ private ClusterHealthResponse clusterHealth( return new ClusterHealthResponse( clusterState.getClusterName().value(), concreteIndices, + request.level(), clusterState, numberOfPendingTasks, numberOfInFlightFetch, diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 9a76beb154882..25a0aa1568bae 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -34,6 +34,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.action.FailedNodeException; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.node.info.NodeInfo; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.indices.stats.CommonStats; @@ -210,7 +211,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq ClusterHealthStatus clusterStatus = null; if (clusterService.state().nodes().isLocalNodeElectedClusterManager()) { - clusterStatus = new ClusterStateHealth(clusterService.state()).getStatus(); + clusterStatus = new ClusterStateHealth(clusterService.state(), ClusterHealthRequest.Level.CLUSTER).getStatus(); } return new ClusterStatsNodeResponse( diff --git a/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java b/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java index 19c64965e6941..f615bbd04fcca 100644 --- a/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java +++ b/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.health; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; @@ -191,6 +192,57 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT this.unassignedShards = computeUnassignedShards; } + public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingTable indexRoutingTable, final ClusterHealthRequest.Level healthLevel) { + this.index = indexMetadata.getIndex().getName(); + this.numberOfShards = indexMetadata.getNumberOfShards(); + this.numberOfReplicas = indexMetadata.getNumberOfReplicas(); + + shards = new HashMap<>(); + boolean isShardLevelHealthInfo = healthLevel == ClusterHealthRequest.Level.SHARDS; + + // update the index status + ClusterHealthStatus computeStatus = ClusterHealthStatus.GREEN; + int computeActivePrimaryShards = 0; + int computeActiveShards = 0; + int computeRelocatingShards = 0; + int computeInitializingShards = 0; + int computeUnassignedShards = 0; + for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { + int shardId = shardRoutingTable.shardId().id(); + ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, shardRoutingTable); + + if (shardHealth.isPrimaryActive()) { + computeActivePrimaryShards++; + } + computeActiveShards += shardHealth.getActiveShards(); + computeRelocatingShards += shardHealth.getRelocatingShards(); + computeInitializingShards += shardHealth.getInitializingShards(); + computeUnassignedShards += shardHealth.getUnassignedShards(); + + if (shardHealth.getStatus() == ClusterHealthStatus.RED) { + computeStatus = ClusterHealthStatus.RED; + } else if (shardHealth.getStatus() == ClusterHealthStatus.YELLOW && computeStatus != ClusterHealthStatus.RED) { + // do not override an existing red + computeStatus = ClusterHealthStatus.YELLOW; + } + + if (isShardLevelHealthInfo) { + shards.put(shardId, shardHealth); + } + } + + if (indexRoutingTable.shards().isEmpty()) { // might be since none has been created yet (two phase index creation) + computeStatus = ClusterHealthStatus.RED; + } + + this.status = computeStatus; + this.activePrimaryShards = computeActivePrimaryShards; + this.activeShards = computeActiveShards; + this.relocatingShards = computeRelocatingShards; + this.initializingShards = computeInitializingShards; + this.unassignedShards = computeUnassignedShards; + } + public ClusterIndexHealth(final StreamInput in) throws IOException { index = in.readString(); numberOfShards = in.readVInt(); diff --git a/server/src/main/java/org/opensearch/cluster/health/ClusterStateHealth.java b/server/src/main/java/org/opensearch/cluster/health/ClusterStateHealth.java index 083159bffdc2b..6ce92af6a95f8 100644 --- a/server/src/main/java/org/opensearch/cluster/health/ClusterStateHealth.java +++ b/server/src/main/java/org/opensearch/cluster/health/ClusterStateHealth.java @@ -31,6 +31,7 @@ package org.opensearch.cluster.health; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexRoutingTable; @@ -76,6 +77,10 @@ public ClusterStateHealth(final ClusterState clusterState) { this(clusterState, clusterState.metadata().getConcreteAllIndices()); } + public ClusterStateHealth(final ClusterState clusterState, final ClusterHealthRequest.Level clusterHealthLevel) { + this(clusterState, clusterState.metadata().getConcreteAllIndices(), clusterHealthLevel); + } + /** * Creates a new ClusterStateHealth instance considering the current cluster state and the provided index names. * @@ -145,6 +150,70 @@ public ClusterStateHealth(final ClusterState clusterState, final String[] concre } } + public ClusterStateHealth(final ClusterState clusterState, final String[] concreteIndices, final ClusterHealthRequest.Level healthLevel) { + numberOfNodes = clusterState.nodes().getSize(); + numberOfDataNodes = clusterState.nodes().getDataNodes().size(); + hasDiscoveredClusterManager = clusterState.nodes().getClusterManagerNodeId() != null; + indices = new HashMap<>(); + boolean isIndexLevelHealthInfo = healthLevel == ClusterHealthRequest.Level.INDICES || healthLevel == ClusterHealthRequest.Level.SHARDS; + + ClusterHealthStatus computeStatus = ClusterHealthStatus.GREEN; + int computeActivePrimaryShards = 0; + int computeActiveShards = 0; + int computeRelocatingShards = 0; + int computeInitializingShards = 0; + int computeUnassignedShards = 0; + for (String index : concreteIndices) { + IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index); + IndexMetadata indexMetadata = clusterState.metadata().index(index); + if (indexRoutingTable == null) { + continue; + } + + ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable); + computeActivePrimaryShards += indexHealth.getActivePrimaryShards(); + computeActiveShards += indexHealth.getActiveShards(); + computeRelocatingShards += indexHealth.getRelocatingShards(); + computeInitializingShards += indexHealth.getInitializingShards(); + computeUnassignedShards += indexHealth.getUnassignedShards(); + if (indexHealth.getStatus() == ClusterHealthStatus.RED) { + computeStatus = ClusterHealthStatus.RED; + } else if (indexHealth.getStatus() == ClusterHealthStatus.YELLOW && computeStatus != ClusterHealthStatus.RED) { + computeStatus = ClusterHealthStatus.YELLOW; + } + + if (isIndexLevelHealthInfo) { + // Store ClusterIndexHealth only when the health is requested at Index or Shard level + indices.put(indexHealth.getIndex(), indexHealth); + } + } + + if (clusterState.blocks().hasGlobalBlockWithStatus(RestStatus.SERVICE_UNAVAILABLE)) { + computeStatus = ClusterHealthStatus.RED; + } + + this.status = computeStatus; + this.activePrimaryShards = computeActivePrimaryShards; + this.activeShards = computeActiveShards; + this.relocatingShards = computeRelocatingShards; + this.initializingShards = computeInitializingShards; + this.unassignedShards = computeUnassignedShards; + + // shortcut on green + if (computeStatus.equals(ClusterHealthStatus.GREEN)) { + this.activeShardsPercent = 100; + } else { + List shardRoutings = clusterState.getRoutingTable().allShards(); + int activeShardCount = 0; + int totalShardCount = 0; + for (ShardRouting shardRouting : shardRoutings) { + if (shardRouting.active()) activeShardCount++; + totalShardCount++; + } + this.activeShardsPercent = (((double) activeShardCount) / totalShardCount) * 100; + } + } + public ClusterStateHealth(final StreamInput in) throws IOException { activePrimaryShards = in.readVInt(); activeShards = in.readVInt(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index e29a81a2c131f..2769d5eb459f8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterState; @@ -196,7 +197,7 @@ public ClusterState applyStartedShards(ClusterState clusterState, List