From 8099f623e281ce6fd30d735b126aafdf3e9836f5 Mon Sep 17 00:00:00 2001 From: Neetika Singhal Date: Thu, 16 May 2024 16:12:31 -0700 Subject: [PATCH] Add rest, transport, service layer changes for Tiering Signed-off-by: Neetika Singhal --- .../org/opensearch/action/ActionModule.java | 9 + .../HotToWarmTieringRequestContext.java | 217 ++++++ .../tiering/HotToWarmTieringResponse.java | 137 ++++ .../action/tiering/RestWarmTieringAction.java | 57 ++ .../action/tiering/TieringIndexRequest.java | 171 +++++ .../tiering/TransportWarmTieringAction.java | 102 +++ .../action/tiering/WarmTieringAction.java | 26 + .../cluster/routing/RoutingPool.java | 5 +- .../org/opensearch/index/IndexModule.java | 64 ++ .../main/java/org/opensearch/node/Node.java | 10 + .../tiering/HotToWarmTieringService.java | 654 ++++++++++++++++++ .../tiering/TieringServiceValidator.java | 334 +++++++++ .../action/tiering/TieringRequestTests.java | 85 +++ .../tiering/TieringRequestValidatorTests.java | 414 +++++++++++ 14 files changed, 2284 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringRequestContext.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringResponse.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/TransportWarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/WarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java create mode 100644 server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java create mode 100644 server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java create mode 100644 server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 16c15f553951c..605854327e266 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -288,6 +288,9 @@ import org.opensearch.action.termvectors.TransportMultiTermVectorsAction; import org.opensearch.action.termvectors.TransportShardMultiTermsVectorAction; import org.opensearch.action.termvectors.TransportTermVectorsAction; +import org.opensearch.action.tiering.RestWarmTieringAction; +import org.opensearch.action.tiering.TransportWarmTieringAction; +import org.opensearch.action.tiering.WarmTieringAction; import org.opensearch.action.update.TransportUpdateAction; import org.opensearch.action.update.UpdateAction; import org.opensearch.client.node.NodeClient; @@ -634,6 +637,9 @@ public void reg actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class); actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class); actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); + if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + actions.register(WarmTieringAction.INSTANCE, TransportWarmTieringAction.class); + } actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class); @@ -966,6 +972,9 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestNodeAttrsAction()); registerHandler.accept(new RestRepositoriesAction()); registerHandler.accept(new RestSnapshotAction()); + if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + registerHandler.accept(new RestWarmTieringAction()); + } registerHandler.accept(new RestTemplatesAction()); // Point in time API diff --git a/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringRequestContext.java b/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringRequestContext.java new file mode 100644 index 0000000000000..e9277e38c21b6 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringRequestContext.java @@ -0,0 +1,217 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.opensearch.common.UUIDs; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class HotToWarmTieringRequestContext { + private Set acceptedIndices; + private Map rejectedIndices; + private Set notFoundIndices; + private Set inProgressIndices; + private Set successfulIndices; + private Set failedIndices; + private ActionListener actionListener; + private TieringIndexRequest request; + private String requestUuid; + + public HotToWarmTieringRequestContext() { + this.acceptedIndices = new HashSet<>(); + this.rejectedIndices = new HashMap<>(); + this.notFoundIndices = new HashSet<>(); + this.successfulIndices = new HashSet<>(); + this.failedIndices = new HashSet<>(); + this.inProgressIndices = new HashSet<>(); + this.requestUuid = UUIDs.randomBase64UUID(); + } + + public HotToWarmTieringRequestContext(ActionListener actionListener, TieringIndexRequest request) { + this(); + this.actionListener = actionListener; + this.request = request; + } + + public HotToWarmTieringRequestContext(Set acceptedIndices, Map rejectedIndices, Set notFoundIndices) { + this.acceptedIndices = acceptedIndices; + this.rejectedIndices = rejectedIndices; + this.notFoundIndices = notFoundIndices; + } + + public Set getAcceptedIndices() { + return acceptedIndices; + } + + public void setAcceptedIndices(Set acceptedIndices) { + this.acceptedIndices = acceptedIndices; + } + + public Map getRejectedIndices() { + return rejectedIndices; + } + + public Set getNotFoundIndices() { + return notFoundIndices; + } + + public void setNotFoundIndices(Set notFoundIndices) { + this.notFoundIndices = notFoundIndices; + } + + public void setRejectedIndices(Map rejectedIndices) { + this.rejectedIndices = rejectedIndices; + } + + public void addToNotFound(String indexName) { + notFoundIndices.add(indexName); + } + + /** + * Method to move indices from success list to failed list with corresponding reasons to be sent in response + * @param indicesToFail - sets of indices that failed validation with same reason + * @param failureReason - reason for not accepting migration for this index + */ + public void addToRejected(Set indicesToFail, String failureReason) { + for (String index : indicesToFail) { + addToRejected(index, failureReason); + } + } + + /** + * Method to move index from success list to failed list with corresponding reasons to be sent in response + * @param indexName - indexName that failed validation + * @param failureReason - reason for not accepting migration for this index + */ + public void addToRejected(String indexName, String failureReason) { + rejectedIndices.put(indexName, failureReason); + } + + public void addToAccepted(Index index) { + acceptedIndices.add(index); + } + + public void addToAccepted(Set indices) { + acceptedIndices.addAll(indices); + } + + public Set getSuccessfulIndices() { + return successfulIndices; + } + + public void setSuccessfulIndices(Set successfulIndices) { + this.successfulIndices = successfulIndices; + } + + public void addToSuccessful(String index) { + successfulIndices.add(index); + } + + public Set getFailedIndices() { + return failedIndices; + } + + public void setFailedIndices(Set failedIndices) { + this.failedIndices = failedIndices; + } + + public void addToFailed(String index) { + failedIndices.add(index); + } + + public ActionListener getListener() { + return actionListener; + } + + public void setActionListener(ActionListener actionListener) { + this.actionListener = actionListener; + } + + public TieringIndexRequest getRequest() { + return request; + } + + public void setRequest(TieringIndexRequest request) { + this.request = request; + } + + public String getRequestUuid() { + return requestUuid; + } + + public void setRequestUuid(String requestUuid) { + this.requestUuid = requestUuid; + } + + public Set getInProgressIndices() { + return inProgressIndices; + } + + public void setInProgressIndices(Set inProgressIndices) { + this.inProgressIndices = inProgressIndices; + } + + public void addToInProgress(Index index) { + inProgressIndices.add(index); + } + + public void removeFromInProgress(Index index) { + inProgressIndices.remove(index); + } + + public boolean isRequestProcessingComplete() { + return inProgressIndices.isEmpty(); + } + + public HotToWarmTieringResponse constructResponse() { + final List indicesResult = new LinkedList<>(); + for (Map.Entry rejectedIndex : rejectedIndices.entrySet()) { + indicesResult.add(new HotToWarmTieringResponse.IndexResult(rejectedIndex.getKey(), rejectedIndex.getValue())); + } + for (String index : notFoundIndices) { + indicesResult.add(new HotToWarmTieringResponse.IndexResult(index, "Index not found")); + } + for (String index : failedIndices) { + indicesResult.add(new HotToWarmTieringResponse.IndexResult(index, "Index failed")); + } + return new HotToWarmTieringResponse(acceptedIndices.size() > 0, indicesResult); + } + + @Override + public String toString() { + return "HotToWarmTieringRequestContext{" + + "acceptedIndices=" + + acceptedIndices + + ", rejectedIndices=" + + rejectedIndices + + ", notFoundIndices=" + + notFoundIndices + + ", inProgressIndices=" + + inProgressIndices + + ", successfulIndices=" + + successfulIndices + + ", failedIndices=" + + failedIndices + + ", actionListener=" + + actionListener + + ", request=" + + request + + ", requestUuid='" + + requestUuid + + '\'' + + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringResponse.java b/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringResponse.java new file mode 100644 index 0000000000000..78aeea62649d8 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringResponse.java @@ -0,0 +1,137 @@ +package org.opensearch.action.tiering; + +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + +/** + * Response object for an {@link TieringIndexRequest} which is sent to client after the initial verification of the request + * by the backend service. The format of the response object will be as below: + * + * { + * "acknowledged": true/false, + * "failed_indices": [ + * { + * "index": "index1", + * "error": "Low disk threshold watermark breached" + * }, + * { + * "index": "index2", + * "error": "Index is not a remote store backed index" + * } + * ] + * } + */ +public class HotToWarmTieringResponse extends AcknowledgedResponse { + + private List failedIndices; + + public HotToWarmTieringResponse(boolean acknowledged, List indicesResults) { + super(acknowledged); + this.failedIndices = (indicesResults == null) ? new LinkedList<>() : indicesResults; + } + + public HotToWarmTieringResponse(StreamInput in) throws IOException { + super(in); + failedIndices = Collections.unmodifiableList(in.readList(IndexResult::new)); + } + + public HotToWarmTieringResponse failedIndices(List failedIndices) { + this.failedIndices = failedIndices; + return this; + } + + public List getFailedIndices() { + return this.failedIndices; + } + + public void addIndexResult(String indexName, String failureReason) { + failedIndices.add(new IndexResult(indexName, failureReason)); + } + + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(this.failedIndices); + } + + protected void addCustomFields(XContentBuilder builder, Params params) throws IOException { + super.addCustomFields(builder, params); + builder.startArray("failed_indices"); + + for (IndexResult failedIndex : failedIndices) { + failedIndex.toXContent(builder, params); + } + builder.endArray(); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + + public static class IndexResult implements Writeable, ToXContentFragment { + private final String index; + private final String failureReason; + + public IndexResult(String index, String failureReason) { + this.index = index; + this.failureReason = failureReason; + } + + IndexResult(StreamInput in) throws IOException { + this.index = in.readString(); + this.failureReason = in.readString(); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeString(failureReason); + } + + public String getIndex() { + return index; + } + + public String getFailureReason() { + return failureReason; + } + + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("index", index); + builder.field("error", failureReason); + return builder.endObject(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IndexResult that = (IndexResult) o; + return Objects.equals(index, that.index) && Objects.equals(failureReason, that.failureReason); + } + + @Override + public int hashCode() { + int result = Objects.hashCode(index); + result = 31 * result + Objects.hashCode(failureReason); + return result; + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java b/server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java new file mode 100644 index 0000000000000..54ca12ac7d7ba --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.core.common.Strings.splitStringByCommaToArray; +import static org.opensearch.rest.RestRequest.Method.POST; + +/** + * Rest Tiering API class to move index from hot to warm + * + * @opensearch.experimental + */ +public class RestWarmTieringAction extends BaseRestHandler { + + private final String TARGET_TIER = "warm"; + + @Override + public List routes() { + return singletonList(new RestHandler.Route(POST, "/{index}/_tier/" + TARGET_TIER)); + } + + @Override + public String getName() { + return "warm_tiering_action"; + } + + @Override + protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + final TieringIndexRequest tieringIndexRequest = new TieringIndexRequest( + TARGET_TIER, + splitStringByCommaToArray(request.param("index")) + ); + tieringIndexRequest.timeout(request.paramAsTime("timeout", tieringIndexRequest.timeout())); + tieringIndexRequest.clusterManagerNodeTimeout( + request.paramAsTime("cluster_manager_timeout", tieringIndexRequest.clusterManagerNodeTimeout()) + ); + tieringIndexRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false)); + return channel -> client.admin() + .cluster() + .execute(WarmTieringAction.INSTANCE, tieringIndexRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java b/server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java new file mode 100644 index 0000000000000..ea9ae84f680ae --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java @@ -0,0 +1,171 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.IndicesRequest; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.master.AcknowledgedRequest; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; + +import static org.opensearch.action.ValidateActions.addValidationError; + +/** + * Represents the tiering request for indices + * to move to a different tier + * + * @opensearch.experimental + */ +@ExperimentalApi +public class TieringIndexRequest extends AcknowledgedRequest implements IndicesRequest.Replaceable { + + private String[] indices; + private Tier targetTier; + private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, false); + private boolean waitForCompletion; + + public TieringIndexRequest() {} + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (indices == null || indices.length == 0) { + validationException = addValidationError("Mandatory parameter - indices is missing from the request", validationException); + } + if (targetTier == null) { + validationException = addValidationError("Mandatory parameter - tier is missing from the request", validationException); + } + if (Tier.HOT.equals(targetTier)) { + validationException = addValidationError("The specified tiering to hot is not supported yet", validationException); + } + return validationException; + } + + public TieringIndexRequest(String targetTier, String... indices) { + this.targetTier = Tier.fromString(targetTier); + this.indices = indices; + } + + public TieringIndexRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + targetTier = Tier.fromString(in.readString()); + indicesOptions = IndicesOptions.readIndicesOptions(in); + waitForCompletion = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + out.writeString(targetTier.value()); + indicesOptions.writeIndicesOptions(out); + out.writeBoolean(waitForCompletion); + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + @Override + public TieringIndexRequest indices(String... indices) { + this.indices = indices; + return this; + } + + public TieringIndexRequest indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + /** + * If this parameter is set to true the operation will wait for completion of tiering process before returning. + * + * @param waitForCompletion if true the operation will wait for completion + * @return this request + */ + public TieringIndexRequest waitForCompletion(boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + return this; + } + + /** + * Returns wait for completion setting + * + * @return true if the operation will wait for completion + */ + public boolean waitForCompletion() { + return waitForCompletion; + } + + public Tier tier() { + return targetTier; + } + + public TieringIndexRequest tier(Tier tier) { + this.targetTier = tier; + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TieringIndexRequest that = (TieringIndexRequest) o; + return clusterManagerNodeTimeout.equals(that.clusterManagerNodeTimeout) + && timeout.equals(that.timeout) + && Objects.equals(indicesOptions, that.indicesOptions) + && Arrays.equals(indices, that.indices) + && targetTier.equals(that.targetTier) + && waitForCompletion == that.waitForCompletion; + } + + @Override + public int hashCode() { + return Objects.hash(clusterManagerNodeTimeout, timeout, indicesOptions, waitForCompletion, Arrays.hashCode(indices)); + } + + @ExperimentalApi + public enum Tier { + HOT, + WARM; + + public static Tier fromString(String name) { + String upperCase = name.trim().toUpperCase(Locale.ROOT); + if (HOT.name().equals(upperCase)) { + return HOT; + } + if (WARM.name().equals(upperCase)) { + return WARM; + } + throw new IllegalArgumentException("Tiering type [" + name + "] is not supported. Supported types are " + HOT + " and " + WARM); + } + + public String value() { + return name().toLowerCase(Locale.ROOT); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/TransportWarmTieringAction.java b/server/src/main/java/org/opensearch/action/tiering/TransportWarmTieringAction.java new file mode 100644 index 0000000000000..cfd64ce4b620c --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/TransportWarmTieringAction.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tiering.HotToWarmTieringService; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Transport Tiering API class to move index from hot to warm + * + * @opensearch.experimental + */ +public class TransportWarmTieringAction extends TransportClusterManagerNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportWarmTieringAction.class); + private final HotToWarmTieringService tieringService; + private final Client client; + + @Inject + public TransportWarmTieringAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + HotToWarmTieringService tieringService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + Client client + ) { + super( + WarmTieringAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + TieringIndexRequest::new, + indexNameExpressionResolver + ); + this.client = client; + this.tieringService = tieringService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected HotToWarmTieringResponse read(StreamInput in) throws IOException { + return new HotToWarmTieringResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(TieringIndexRequest request, ClusterState state) { + ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + if (blockException == null) { + for (String index : request.indices()) { + try { + blockException = state.blocks() + .indicesBlockedException( + ClusterBlockLevel.METADATA_WRITE, + indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), index) + ); + } catch (IndexNotFoundException e) { + logger.debug("Index [{}] not found: {}", index, e); + } + } + } + return blockException; + } + + @Override + protected void clusterManagerOperation( + TieringIndexRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + tieringService.tier(request, listener); + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/WarmTieringAction.java b/server/src/main/java/org/opensearch/action/tiering/WarmTieringAction.java new file mode 100644 index 0000000000000..b006a40011603 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/WarmTieringAction.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.opensearch.action.ActionType; + +/** + * Tiering action class to move index from hot to warm + * + * @opensearch.experimental + */ +public class WarmTieringAction extends ActionType { + + public static final WarmTieringAction INSTANCE = new WarmTieringAction(); + public static final String NAME = "indices:admin/tiering/warm"; + + public WarmTieringAction() { + super(NAME, HotToWarmTieringResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java index db10ad61c7d6d..470533bf14fb6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java @@ -11,6 +11,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.index.IndexModule; /** * {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods @@ -58,6 +59,8 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all * @return {@link RoutingPool} for the given index. */ public static RoutingPool getIndexPool(IndexMetadata indexMetadata) { - return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY; + return indexMetadata.isRemoteSnapshot() + || IndexModule.DataLocalityType.PARTIAL.name() + .equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())) ? REMOTE_CAPABLE : LOCAL_ONLY; } } diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 4c494a6b35153..9c06cdd809fc8 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -48,6 +48,7 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.SetOnce; import org.opensearch.common.TriFunction; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.settings.Setting; @@ -173,6 +174,26 @@ public final class IndexModule { Property.NodeScope ); + /** + * Index setting which used to determine how the data is cached locally fully or partially + */ + public static final Setting INDEX_STORE_LOCALITY_SETTING = new Setting<>( + "index.store.data_locality", + DataLocalityType.FULL.name(), + DataLocalityType::getValueOf, + Property.IndexScope, + Property.NodeScope, + Property.PrivateIndex + ); + + public static final Setting INDEX_TIERING_STATE = new Setting<>( + "index.tiering.state", + TieringState.HOT.name(), + Function.identity(), + Property.IndexScope, + Property.PrivateIndex + ); + /** Which lucene file extensions to load with the mmap directory when using hybridfs store. This settings is ignored if {@link #INDEX_STORE_HYBRID_NIO_EXTENSIONS} is set. * This is an expert setting. * @see Lucene File Extensions. @@ -658,6 +679,49 @@ public static Type defaultStoreType(final boolean allowMmap) { } } + /** + * Indicates the locality of the data - whether it will be cached fully or partially + */ + @ExperimentalApi + public enum DataLocalityType { + /** + * Indicates that all the data will be cached locally + */ + FULL, + /** + * Indicates that only a subset of the data will be cached locally + */ + PARTIAL; + + private static final Map LOCALITY_TYPES; + + static { + final Map localityTypes = new HashMap<>(values().length); + for (final DataLocalityType dataLocalityType : values()) { + localityTypes.put(dataLocalityType.name(), dataLocalityType); + } + LOCALITY_TYPES = Collections.unmodifiableMap(localityTypes); + } + + public static DataLocalityType getValueOf(final String localityType) { + Objects.requireNonNull(localityType, "No locality type given."); + final String localityTypeName = toRootUpperCase(localityType.trim()); + final DataLocalityType type = LOCALITY_TYPES.get(localityTypeName); + if (type != null) { + return type; + } + throw new IllegalArgumentException("Unknown locality type constant [" + localityType + "]."); + } + } + + @ExperimentalApi + public enum TieringState { + HOT, + HOT_TO_WARM, + WARM, + WARM_TO_HOT; + } + public IndexService newIndexService( IndexService.IndexCreationContext indexCreationContext, NodeEnvironment environment, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a91dce4ece126..d755699579cd1 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -252,6 +252,7 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tiering.HotToWarmTieringService; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportInterceptor; @@ -1175,6 +1176,14 @@ protected Node( remoteClusterStateService ); + HotToWarmTieringService tieringService = new HotToWarmTieringService( + settings, + clusterService, + clusterModule.getIndexNameExpressionResolver(), + clusterModule.getAllocationService(), + clusterInfoService + ); + final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor( settings, clusterService::state, @@ -1371,6 +1380,7 @@ protected Node( b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus); b.bind(RestoreService.class).toInstance(restoreService); b.bind(RemoteStoreRestoreService.class).toInstance(remoteStoreRestoreService); + b.bind(HotToWarmTieringService.class).toInstance(tieringService); b.bind(RerouteService.class).toInstance(rerouteService); b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); diff --git a/server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java b/server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java new file mode 100644 index 0000000000000..78c23ec4d37d6 --- /dev/null +++ b/server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java @@ -0,0 +1,654 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.opensearch.action.tiering.HotToWarmTieringRequestContext; +import org.opensearch.action.tiering.HotToWarmTieringResponse; +import org.opensearch.action.tiering.TieringIndexRequest; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.NotClusterManagerException; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexNotFoundException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; +import static org.opensearch.tiering.TieringServiceValidator.validateHotToWarm; + +/** + * Service responsible for tiering indices from hot to warm + * @opensearch.experimental + */ +public class HotToWarmTieringService extends AbstractLifecycleComponent implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(HotToWarmTieringService.class); + + protected final ClusterService clusterService; + + protected final IndexNameExpressionResolver indexNameExpressionResolver; + + protected final AllocationService allocationService; + + protected final ClusterInfoService clusterInfoService; + protected final DiskThresholdSettings diskThresholdSettings; + public static final Setting HOT_TO_WARM_START_TIME_SETTING = Setting.longSetting( + "index.tiering.hot_to_warm.start_time", + System.currentTimeMillis(), + 0, + Setting.Property.IndexScope, + Setting.Property.PrivateIndex + ); + + public static final Setting HOT_TO_WARM_END_TIME_SETTING = Setting.longSetting( + "index.tiering.hot_to_warm.end_time", + 0, + 0, + Setting.Property.IndexScope, + Setting.Property.PrivateIndex + ); + private Map> indexShardTieringStatus = new HashMap<>(); + private Map indexToRequestUuid = new HashMap<>(); + private Map requestUuidToRequestContext = new HashMap<>(); + + @Inject + public HotToWarmTieringService( + Settings settings, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + AllocationService allocationService, + ClusterInfoService clusterInfoService + ) { + super(); + this.clusterService = clusterService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.allocationService = allocationService; + this.clusterInfoService = clusterInfoService; + this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterService.getClusterSettings()); + + if (DiscoveryNode.isClusterManagerNode(settings) && FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + clusterService.addListener(this); + } + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + // TODO add handling for master switch, dangling indices, master reload + if (event.routingTableChanged()) { + updateIndexShardStatus(event.state()); + } + } + + public void completeRequestLevelTiering(Index index) { + String requestUuid = indexToRequestUuid.get(index.getName()); + if (requestUuid == null) { + logger.debug("Tiering for the index [{}] is already marked as completed", index.getName()); + return; + } + HotToWarmTieringRequestContext requestContext = requestUuidToRequestContext.get(requestUuid); + assert requestContext != null : "requestContext cannot be null for request uuid " + requestUuid; + if (requestContext.isRequestProcessingComplete()) { + logger.info("Tiering is completed for the request [{}]", requestContext); + for (Index indexToRemove : requestContext.getAcceptedIndices()) { + indexToRequestUuid.remove(indexToRemove.getName()); + } + requestUuidToRequestContext.remove(requestUuid); + if (requestContext.getRequest().waitForCompletion()) { + requestContext.getListener().onResponse(requestContext.constructResponse()); + } + } + } + + private void updateIndexShardStatus(ClusterState clusterState) { + List routingTable; + Set relocationCompletedIndices = new HashSet<>(); + Set failedIndices = new HashSet<>(); + for (Index index : indexShardTieringStatus.keySet()) { + try { + // Ensure index is not deleted + routingTable = clusterState.routingTable().allShards(index.getName()); + } catch (IndexNotFoundException ex) { + // Index already deleted nothing to do + logger.warn("Index [{}] deleted before hot to warm relocation finished", index.getName()); + HotToWarmTieringRequestContext requestContext = requestUuidToRequestContext.get(indexToRequestUuid.get(index.getName())); + requestContext.addToNotFound(index.getName()); + requestContext.removeFromInProgress(index); + indexShardTieringStatus.remove(index); + completeRequestLevelTiering(index); + continue; + } + + Map shardTieringStatusMap = indexShardTieringStatus.getOrDefault(index, new HashMap<>()); + List processingShards = shardTieringStatusMap.keySet() + .stream() + .filter( + shardId -> shardTieringStatusMap.get(shardId).state() == State.INIT + || shardTieringStatusMap.get(shardId).state() == State.PROCESSING + ) + .collect(Collectors.toList()); + if (processingShards.isEmpty()) { + // No shards are in processing state, nothing to do + // This means that tiering for the index is completed - either failed or successful + continue; + } + boolean relocationCompleted = true; + for (ShardRouting shard : routingTable) { + if (shardTieringStatusMap.get(shard.shardId()) != null + && (State.SUCCESSFUL.equals(shardTieringStatusMap.get(shard.shardId()).state()) + || State.FAILED.equals(shardTieringStatusMap.get(shard.shardId()).state()))) { + continue; + } + State tieringState; + String reason = null; + boolean isShardFoundOnSearchNode = clusterState.getNodes().get(shard.currentNodeId()).isSearchNode(); + boolean isShardRelocatingToSearchNode = clusterState.getNodes().get(shard.relocatingNodeId()).isSearchNode(); + + if (shard.started() && isShardFoundOnSearchNode) { + tieringState = State.SUCCESSFUL; + } else if (shard.unassigned()) { + tieringState = State.FAILED; + relocationCompleted = false; + failedIndices.add(index); + reason = "Shard is unassigned due to " + shard.unassignedInfo().getReason(); + } else if ((shard.initializing() && !isShardFoundOnSearchNode) || (shard.relocating() && !isShardRelocatingToSearchNode)) { + tieringState = State.FAILED; + relocationCompleted = false; + failedIndices.add(index); + reason = "Shard with current state: " + + shard.state().toString() + + "is neither allocated nor relocating to the search node, " + + "current node: " + + shard.currentNodeId() + + ", relocating node: " + + shard.relocatingNodeId(); + } else { + tieringState = State.PROCESSING; + relocationCompleted = false; + } + shardTieringStatusMap.put( + shard.shardId(), + new ShardTieringStatus(shard.currentNodeId(), shard.state(), tieringState, reason) + ); + } + indexShardTieringStatus.put(index, shardTieringStatusMap); + if (relocationCompleted) { + logger.info("Hot to warm relocation completed for index [{}]", index.getName()); + relocationCompletedIndices.add(index); + } + } + if (!relocationCompletedIndices.isEmpty()) { + processSuccessfullyTieredIndices(relocationCompletedIndices); + } + if (!failedIndices.isEmpty()) { + processFailedIndices(failedIndices); + } + } + + private void processFailedIndices(Set indices) { + clusterService.submitStateUpdateTask("process hot to warm tiering for failed indices", new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + + for (Index index : indices) { + final IndexMetadata indexMetadata = metadataBuilder.get(index.getName()); + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + + // update tiering settings here + indexSettingsBuilder.put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL); + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT); + indexSettingsBuilder.put(HOT_TO_WARM_END_TIME_SETTING.getKey(), System.currentTimeMillis()); + + // Update number of replicas to 1 in case the number of replicas is lesser than 1 + if (Integer.parseInt(metadataBuilder.getSafe(index).getSettings().get(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey())) < 1) { + final String[] indices = new String[] { index.getName() }; + routingTableBuilder.updateNumberOfReplicas(1, indices); + metadataBuilder.updateNumberOfReplicas(1, indices); + } + + // Update index settings version + final IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + builder.settingsVersion(1 + builder.settingsVersion()); + metadataBuilder.put(builder); + } + + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .blocks(blocks) + .build(); + + // now, reroute to trigger shard relocation for shards to go back to hot nodes + updatedState = allocationService.reroute(updatedState, "hot to warm revert tiering"); + + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn( + (Supplier) () -> new ParameterizedMessage("failed to complete hot to warm tiering for indices " + "[{}]", indices), + e + ); + } + + @Override + public void onNoLongerClusterManager(String source) { + this.onFailure(source, new NotClusterManagerException("no longer cluster manager. source: [" + source + "]")); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("Cluster state updated for source " + source); + for (Index index : indices) { + HotToWarmTieringRequestContext requestContext = requestUuidToRequestContext.get( + indexToRequestUuid.get(index.getName()) + ); + requestContext.addToFailed(index.getName()); + requestContext.removeFromInProgress(index); + completeRequestLevelTiering(index); + } + } + }); + } + + private void processSuccessfullyTieredIndices(Set indices) { + clusterService.submitStateUpdateTask("complete hot to warm tiering", new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + + for (Index index : indices) { + final IndexMetadata indexMetadata = metadataBuilder.get(index.getName()); + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // put/update tiering settings here + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.WARM); + indexSettingsBuilder.put(HOT_TO_WARM_END_TIME_SETTING.getKey(), System.currentTimeMillis()); + + // Update index settings version + final IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + builder.settingsVersion(1 + builder.settingsVersion()); + metadataBuilder.put(builder); + } + + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .blocks(blocks) + .build(); + + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn( + (Supplier) () -> new ParameterizedMessage("failed to complete hot to warm tiering for indices " + "[{}]", indices), + e + ); + } + + @Override + public void onNoLongerClusterManager(String source) { + this.onFailure(source, new NotClusterManagerException("no longer cluster manager. source: [" + source + "]")); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("Cluster state updated for source " + source); + for (Index index : indices) { + indexShardTieringStatus.remove(index); + HotToWarmTieringRequestContext requestContext = requestUuidToRequestContext.get( + indexToRequestUuid.get(index.getName()) + ); + requestContext.addToSuccessful(index.getName()); + requestContext.removeFromInProgress(index); + completeRequestLevelTiering(index); + } + } + }); + } + + /** + * Tier indices from hot to warm + * @param request - tiering request + * @param listener - call back listener + */ + public void tier(final TieringIndexRequest request, final ActionListener listener) { + clusterService.submitStateUpdateTask("start hot to warm tiering", new ClusterStateUpdateTask(Priority.URGENT) { + final HotToWarmTieringRequestContext hotToWarmTieringRequestContext = new HotToWarmTieringRequestContext(listener, request); + + @Override + public ClusterState execute(ClusterState currentState) { + List concreteIndices = new ArrayList<>(); + for (String index : request.indices()) { + Index[] resolvedIndices = null; + try { + resolvedIndices = indexNameExpressionResolver.concreteIndices(currentState, request.indicesOptions(), index); + } catch (IndexNotFoundException e) { + hotToWarmTieringRequestContext.addToNotFound(e.getIndex().getName()); + logger.debug("Index [{}] not found: {}", index, e); + } + if (resolvedIndices != null) { + concreteIndices.addAll(Arrays.asList(resolvedIndices)); + } + } + String requestUuid = hotToWarmTieringRequestContext.getRequestUuid(); + HotToWarmTieringRequestContext validationResult = validateHotToWarm( + currentState, + concreteIndices.toArray(Index.EMPTY_ARRAY), + clusterInfoService.getClusterInfo(), + diskThresholdSettings + ); + hotToWarmTieringRequestContext.setAcceptedIndices(validationResult.getAcceptedIndices()); + hotToWarmTieringRequestContext.setRejectedIndices(validationResult.getRejectedIndices()); + hotToWarmTieringRequestContext.setInProgressIndices(validationResult.getAcceptedIndices()); + requestUuidToRequestContext.put(requestUuid, hotToWarmTieringRequestContext); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + + for (Index index : validationResult.getAcceptedIndices()) { + final IndexMetadata indexMetadata = metadataBuilder.get(index.getName()); + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // put additional settings here + indexSettingsBuilder.put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL); + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT_TO_WARM); + indexSettingsBuilder.put(HOT_TO_WARM_START_TIME_SETTING.getKey(), System.currentTimeMillis()); + indexSettingsBuilder.put(HOT_TO_WARM_END_TIME_SETTING.getKey(), -1); + + // Update number of replicas to 1 in case the number of replicas is greater than 1 + if (Integer.parseInt(metadataBuilder.getSafe(index).getSettings().get(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey())) > 1) { + final String[] indices = new String[] { index.getName() }; + routingTableBuilder.updateNumberOfReplicas(1, indices); + metadataBuilder.updateNumberOfReplicas(1, indices); + } + + // Update index settings version + final IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + builder.settingsVersion(1 + builder.settingsVersion()); + metadataBuilder.put(builder); + Map shardTieringStatus = new HashMap<>(); + for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) { + ShardId shardId = new ShardId(indexMetadata.getIndex(), shard); + shardTieringStatus.put(shardId, new ShardTieringStatus(currentState.nodes().getLocalNodeId(), null)); + } + indexShardTieringStatus.put(index, shardTieringStatus); + indexToRequestUuid.put(index.getName(), requestUuid); + } + + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .blocks(blocks) + .build(); + + // now, reroute to trigger shard relocation for the dedicated case + updatedState = allocationService.reroute(updatedState, "hot to warm tiering"); + + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn( + (Supplier) () -> new ParameterizedMessage( + "failed to start warm tiering for indices " + "[{}]", + (Object) request.indices() + ), + e + ); + listener.onFailure(e); + } + + @Override + public void onNoLongerClusterManager(String source) { + this.onFailure(source, new NotClusterManagerException("no longer cluster manager. source: [" + source + "]")); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("Cluster state updated for source " + source); + if (!request.waitForCompletion()) { + listener.onResponse(hotToWarmTieringRequestContext.constructResponse()); + } + } + + @Override + public TimeValue timeout() { + return request.clusterManagerNodeTimeout(); + } + }); + } + + @Override + protected void doStart() {} + + @Override + protected void doStop() {} + + @Override + protected void doClose() throws IOException {} + + /** + * Represents status of a tiering shard + * + * @opensearch.experimental + */ + @ExperimentalApi + public static class ShardTieringStatus { + private State state; + private ShardRoutingState shardRoutingState; + private String nodeId; + private String reason; + + private ShardTieringStatus() {} + + /** + * Constructs a new shard tiering status in initializing state on the given node + * + * @param nodeId node id + */ + public ShardTieringStatus(String nodeId) { + this(nodeId, null); + } + + /** + * Constructs a new shard tiering status in with specified state on the given node + * + * @param nodeId node id + * @param shardRoutingState shard state + */ + public ShardTieringStatus(String nodeId, ShardRoutingState shardRoutingState) { + this(nodeId, shardRoutingState, State.INIT, null); + } + + /** + * Constructs a new shard tiering status in with specified state on the given node with specified failure reason + * + * @param nodeId node id + * @param state shard state + * @param reason failure reason + */ + public ShardTieringStatus(String nodeId, ShardRoutingState shardRoutingState, State state, String reason) { + this.nodeId = nodeId; + this.state = state; + this.reason = reason; + this.shardRoutingState = shardRoutingState; + } + + /** + * Returns current state + * + * @return current state + */ + public State state() { + return state; + } + + /** + * Returns node id of the node where shared is getting tiered + * + * @return node id + */ + public String nodeId() { + return nodeId; + } + + /** + * Returns failure reason + * + * @return failure reason + */ + public String reason() { + return reason; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ShardTieringStatus status = (ShardTieringStatus) o; + return state == status.state && Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason); + } + + @Override + public int hashCode() { + return Objects.hash(state, nodeId, reason); + } + } + + /** + * Tiering state + * + * @opensearch.experimental + */ + @ExperimentalApi + public enum State { + /** + * Initializing state + */ + INIT((byte) 0), + /** + * Processing state + */ + PROCESSING((byte) 1), + /** + * Tiering finished successfully + */ + SUCCESSFUL((byte) 2), + /** + * Tiering failed + */ + FAILED((byte) 3); + + private final byte value; + + /** + * Constructs new state + * + * @param value state code + */ + State(byte value) { + this.value = value; + } + + /** + * Returns state code + * + * @return state code + */ + public byte value() { + return value; + } + + /** + * Returns true if tiering completed (either successfully or with failure) + * + * @return true if tiering completed + */ + public boolean completed() { + return this == SUCCESSFUL || this == FAILED; + } + + /** + * Returns state corresponding to state code + * + * @param value stat code + * @return state + */ + public static State fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 1: + return PROCESSING; + case 2: + return SUCCESSFUL; + case 3: + return FAILED; + default: + throw new IllegalArgumentException("No tiering state for value [" + value + "]"); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java b/server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java new file mode 100644 index 0000000000000..3a1ec7d708bd4 --- /dev/null +++ b/server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java @@ -0,0 +1,334 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.tiering.HotToWarmTieringRequestContext; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiskUsage; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.health.ClusterIndexHealth; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.common.collect.Tuple; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +/** + * Validator class to validate the tiering requests of the index + * @opensearch.experimental + */ +public class TieringServiceValidator { + + private static final Logger logger = LogManager.getLogger(TieringServiceValidator.class); + + /** + * Validates the tiering request for indices going from hot to warm tier + * + * @param currentState current cluster state + * @param indices array of indices to be validated + * @param clusterInfo the current nodes usage info for the cluster + * @param diskThresholdSettings the disk threshold settings of the cluster + * @return array of indices that are accepted for tiering + */ + public static HotToWarmTieringRequestContext validateHotToWarm( + final ClusterState currentState, + final Index[] indices, + final ClusterInfo clusterInfo, + final DiskThresholdSettings diskThresholdSettings + ) { + String indexNames = Arrays.stream(indices).map(Index::getName).collect(Collectors.joining(", ")); + validateSearchNodes(currentState, indexNames); + validateDiskThresholdWaterMarkNotBreached(currentState, clusterInfo, diskThresholdSettings); + + Set notHotIndices = new HashSet<>(); + Set notRemoteStoreBacked = new HashSet<>(); + Set closedIndices = new HashSet<>(); + Set redIndices = new HashSet<>(); + Set acceptedIndices = new HashSet<>(); + + HotToWarmTieringRequestContext hotToWarmTieringRequestContext = new HotToWarmTieringRequestContext(); + + for (Index index : indices) { + if (!validateHotIndex(currentState, index)) { + notHotIndices.add(index.getName()); + continue; + } + if (!validateRemoteStoreIndex(currentState, index)) { + notRemoteStoreBacked.add(index.getName()); + continue; + } + if (!validateOpenIndex(currentState, index)) { + closedIndices.add(index.getName()); + continue; + } + if (!validateIndexHealth(currentState, index)) { + redIndices.add(index.getName()); + continue; + } + acceptedIndices.add(index); + } + + if (!notHotIndices.isEmpty()) { + logger.warn( + "Rejecting tiering request for indices [{}] because they are already in the warm tier or the tiering is currently in progress.", + notHotIndices + ); + hotToWarmTieringRequestContext.addToRejected( + notHotIndices, + "index is already in the warm tier or the tiering is currently in progress for the index" + ); + } + if (!notRemoteStoreBacked.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because they are not remote store enabled.", notRemoteStoreBacked); + hotToWarmTieringRequestContext.addToRejected(notRemoteStoreBacked, "index is not backed up by the remote store"); + } + if (!closedIndices.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because they are closed.", closedIndices); + hotToWarmTieringRequestContext.addToRejected(closedIndices, "index is closed"); + } + if (!redIndices.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because they are red.", redIndices); + hotToWarmTieringRequestContext.addToRejected(redIndices, "index is red"); + } + + Tuple, Set> eligibleNodesCapacity = validateEligibleNodesCapacity(clusterInfo, currentState, indices, 0); + hotToWarmTieringRequestContext.addToAccepted(eligibleNodesCapacity.v1()); + hotToWarmTieringRequestContext.addToRejected(eligibleNodesCapacity.v2(), "insufficient node capacity"); + logger.info("Successfully accepted indices for tiering [{}]", acceptedIndices); + + return hotToWarmTieringRequestContext; + } + + /** + * Validates that there are eligible nodes with the search role in the current cluster state. + * (only for the dedicated case - to be removed later) + * + * @param currentState the current cluster state + * @param indexNames the names of the indices being validated + * @throws IllegalArgumentException if there are no eligible search nodes in the cluster + */ + public static void validateSearchNodes(final ClusterState currentState, final String indexNames) { + if (getEligibleNodes(currentState).isEmpty()) { + final String errorMsg = "Rejecting tiering request for indices [" + + indexNames + + "] because there are no nodes found with the search role"; + logger.warn(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + } + + /** + * Validates that the specified index has the remote store setting enabled. + * + * @param state the current cluster state + * @param index the index to be validated + * @return true if the remote store setting is enabled for the index, false otherwise + */ + public static boolean validateRemoteStoreIndex(final ClusterState state, final Index index) { + return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(state.metadata().getIndexSafe(index).getSettings()); + } + + /** + * Validates that the specified index is in the "hot" tiering state. + * + * @param state the current cluster state + * @param index the index to be validated + * @return true if the index is in the "hot" tiering state, false otherwise + */ + public static boolean validateHotIndex(final ClusterState state, final Index index) { + return IndexModule.TieringState.HOT.name().equals(INDEX_TIERING_STATE.get(state.metadata().getIndexSafe(index).getSettings())); + } + + /** + * Validates the health of the specified index in the current cluster state. + * + * @param currentState the current cluster state + * @param index the index to be validated + * @return true if the index health is not in the "red" state, false otherwise + */ + public static boolean validateIndexHealth(final ClusterState currentState, final Index index) { + final IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index); + final IndexMetadata indexMetadata = currentState.metadata().index(index); + final ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable); + return !ClusterHealthStatus.RED.equals(indexHealth.getStatus()); + } + + /** + * Validates that the specified index is in the open state in the current cluster state. + * + * @param currentState the current cluster state + * @param index the index to be validated + * @return true if the index is in the open state, false otherwise + */ + public static boolean validateOpenIndex(final ClusterState currentState, final Index index) { + return currentState.metadata().index(index).getState() == IndexMetadata.State.OPEN; + } + + /** + * Validates that the disk threshold low watermark is not breached on all the eligible nodes in the cluster. + * + * @param currentState the current cluster state + * @param clusterInfo the current nodes usage info for the cluster + * @param diskThresholdSettings the disk threshold settings of the cluster + * @throws IllegalArgumentException if the disk threshold low watermark is breached on all eligible nodes + */ + public static void validateDiskThresholdWaterMarkNotBreached( + final ClusterState currentState, + final ClusterInfo clusterInfo, + final DiskThresholdSettings diskThresholdSettings + ) { + final Map usages = clusterInfo.getNodeLeastAvailableDiskUsages(); + if (usages == null) { + logger.trace("skipping monitor as no disk usage information is available"); + return; + } + final Set nodeIds = getEligibleNodes(currentState).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + int count = 0; + for (String node : usages.keySet()) { + if (nodeIds.contains(node)) { + if (usages.get(node).getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) { + logger.warn("Disk threshold low watermark breached on the node [{}]", node); + count++; + } + } + } + + if (count == nodeIds.size()) { + throw new IllegalArgumentException( + "Disk threshold low watermark breached on all the search nodes, hence no new index can be accepted for tiering " + ); + } + } + + /** + * Validates the capacity of eligible nodes in the cluster to accommodate the specified indices. + * + * @param clusterInfo the current nodes usage info for the cluster + * @param currentState the current cluster state + * @param indices the indices to be validated + * @param bufferSpace the amount of buffer space to reserve on each node + * @return a list of indices that can be accommodated by the eligible nodes + */ + public static Tuple, Set> validateEligibleNodesCapacity( + ClusterInfo clusterInfo, + final ClusterState currentState, + final Index[] indices, + final long bufferSpace + ) { + + final Set eligibleNodeIds = getEligibleNodes(currentState).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + long totalAvailableBytesInWarmTier = getTotalAvailableBytesInWarmTier( + clusterInfo.getNodeLeastAvailableDiskUsages(), + eligibleNodeIds, + bufferSpace + ); + + Map indexSizes = new HashMap<>(); + for (Index index : indices) { + indexSizes.put(index, getTotalIndexSize(currentState, clusterInfo, index.getName())); + } + + if (indexSizes.values().stream().mapToLong(Long::longValue).sum() < totalAvailableBytesInWarmTier) { + return new Tuple<>(Set.of(indices), Set.of()); + } + HashMap sortedIndexSizes = indexSizes.entrySet() + .stream() + .sorted(Map.Entry.comparingByValue()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, HashMap::new)); + + long requestIndexBytes = 0L; + Set acceptedIndices = new HashSet<>(); + Set rejectedIndices = new HashSet<>(); + for (Index index : sortedIndexSizes.keySet()) { + requestIndexBytes += sortedIndexSizes.get(index); + if (requestIndexBytes < totalAvailableBytesInWarmTier) { + acceptedIndices.add(index); + } else { + rejectedIndices.add(index.getName()); + } + } + if (!rejectedIndices.isEmpty()) { + logger.warn( + "Rejecting tiering request for indices [{}] because total available bytes is less than the " + "requested bytes.", + rejectedIndices + ); + } + return new Tuple<>(acceptedIndices, rejectedIndices); + } + + /** + * Calculates the total size of the specified index in the cluster. + * Note: This function only accounts for the primary shard size. + * + * @param clusterState the current state of the cluster + * @param clusterInfo the current nodes usage info for the cluster + * @param index the name of the index for which the total size is to be calculated + * @return the total size of the specified index in the cluster + */ + public static long getTotalIndexSize(ClusterState clusterState, ClusterInfo clusterInfo, String index) { + long totalIndexSize = 0; + List shardRoutings = clusterState.routingTable().allShards(index); + for (ShardRouting shardRouting : shardRoutings) { + if (shardRouting.primary()) { + totalIndexSize += clusterInfo.getShardSize(shardRouting, 0); + } + } + return totalIndexSize; + } + + /** + * Calculates the total available bytes in the warm tier of the cluster. + * + * @param usages the current disk usage of the cluster + * @param nodeIds the set of warm nodes ids in the cluster + * @param bufferSpace the amount of buffer space to reserve on each node + * @return the total available bytes in the warm tier + */ + public static long getTotalAvailableBytesInWarmTier( + final Map usages, + final Set nodeIds, + final long bufferSpace + ) { + long totalAvailableBytes = 0; + for (String node : nodeIds) { + final long totalBytesOnNode = usages.get(node).getTotalBytes(); + final long totalAvailBytesOnNode = usages.get(node).getFreeBytes(); + final long diskSpaceBufferBytes = Math.round(Math.min(bufferSpace, totalBytesOnNode * .2)); + totalAvailableBytes += Math.max(0, (totalAvailBytesOnNode - diskSpaceBufferBytes)); + } + return totalAvailableBytes; + } + + /** + * Retrieves the set of eligible(search) nodes from the current cluster state. + * + * @param currentState the current cluster state + * @return the set of eligible nodes + */ + public static Set getEligibleNodes(final ClusterState currentState) { + final Map nodes = currentState.getNodes().getDataNodes(); + return nodes.values().stream().filter(DiscoveryNode::isSearchNode).collect(Collectors.toSet()); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java new file mode 100644 index 0000000000000..659fa894148e0 --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.tiering; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.tiering.TieringIndexRequest; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class TieringRequestTests extends OpenSearchTestCase { + + public void testTieringRequestWithListOfIndices() { + TieringIndexRequest request = new TieringIndexRequest(); + request.indices("foo", "bar", "baz"); + request.tier(TieringIndexRequest.Tier.WARM); + request.indices("test") + .indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + ActionRequestValidationException validationException = request.validate(); + assertNull(validationException); + } + + public void testTieringRequestWithIndexPattern() { + TieringIndexRequest request = new TieringIndexRequest(); + request.indices("foo-*"); + request.tier(TieringIndexRequest.Tier.WARM); + ActionRequestValidationException validationException = request.validate(); + assertNull(validationException); + } + + public void testTieringRequestWithNullOrEmptyIndices() { + TieringIndexRequest request = new TieringIndexRequest(); + ActionRequestValidationException validationException = request.validate(); + assertNotNull(validationException); + request.indices(""); + validationException = request.validate(); + assertNotNull(validationException); + } + + public void testTieringRequestWithNullOrNotSupportedTier() { + TieringIndexRequest request = new TieringIndexRequest(); + request.indices("test"); + ActionRequestValidationException validationException = request.validate(); + assertNotNull(validationException); + request.tier(TieringIndexRequest.Tier.HOT); + validationException = request.validate(); + assertNotNull(validationException); + } + + public void testTieringTypeFromString() { + expectThrows(IllegalArgumentException.class, () -> TieringIndexRequest.Tier.fromString("tier")); + } + + public void testSerDeOfTieringRequest() throws IOException { + TieringIndexRequest request = new TieringIndexRequest("warm", "test"); + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + final TieringIndexRequest deserializedRequest = new TieringIndexRequest(in); + assertEquals(request, deserializedRequest); + } + } + } + + public void testTieringRequestEquals() { + final TieringIndexRequest original = new TieringIndexRequest("warm", "test"); + original.indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + final TieringIndexRequest expected = new TieringIndexRequest("warm", original.indices()); + expected.indicesOptions(original.indicesOptions()); + assertThat(expected, equalTo(original)); + assertThat(expected.indices(), equalTo(original.indices())); + assertThat(expected.indicesOptions(), equalTo(original.indicesOptions())); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java new file mode 100644 index 0000000000000..cfd96a9bebb0b --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java @@ -0,0 +1,414 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.tiering; + +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.stats.IndicesStatsTests; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.StoreStats; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.tiering.TieringServiceValidator; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static java.util.Collections.emptyList; + +public class TieringRequestValidatorTests extends OpenSearchTestCase { + + private TieringServiceValidator validator = new TieringServiceValidator(); + + public void testValidateSearchNodes() { + + ClusterState clusterStateWithSearchNodes = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(2, 0, 0)) + .build(); + + // throws no errors + validator.validateSearchNodes(clusterStateWithSearchNodes, "test_index"); + } + + public void testValidateSearchNodesThrowError() { + + ClusterState clusterStateWithNoSearchNodes = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(0, 1, 1)) + .build(); + // throws error + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> validator.validateSearchNodes(clusterStateWithNoSearchNodes, "test") + ); + } + + public void testValidRemoteStoreIndex() { + + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + + ClusterState clusterState1 = buildClusterState( + indexName, + indexUuid, + Settings.builder() + .put(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .build() + ); + + assertTrue(validator.validateRemoteStoreIndex(clusterState1, new Index(indexName, indexUuid))); + } + + public void testNotRemoteStoreIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertFalse( + validator.validateRemoteStoreIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid)) + ); + } + + public void testValidHotIndex() { + + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertTrue(validator.validateHotIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); + } + + public void testNotHotIndex() { + + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + + IndexModule.TieringState tieringState = randomBoolean() ? IndexModule.TieringState.HOT_TO_WARM : IndexModule.TieringState.WARM; + + ClusterState clusterState = buildClusterState( + indexName, + indexUuid, + Settings.builder().put(IndexModule.INDEX_TIERING_STATE.getKey(), tieringState).build() + ); + assertFalse(validator.validateHotIndex(clusterState, new Index(indexName, indexUuid))); + } + + public void testValidateIndexHealth() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + ClusterState clusterState = buildClusterState(indexName, indexUuid, Settings.EMPTY); + assertTrue(validator.validateIndexHealth(clusterState, new Index(indexName, indexUuid))); + } + + public void testValidOpenIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertTrue(validator.validateOpenIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); + } + + public void testNotValidOpenIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertFalse( + validator.validateOpenIndex( + buildClusterState(indexName, indexUuid, Settings.EMPTY, IndexMetadata.State.CLOSE), + new Index(indexName, indexUuid) + ) + ); + } + + public void testValidateDiskThresholdWaterMarkNotBreached() { + DiscoveryNode node1 = new DiscoveryNode( + "node1", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), + Version.CURRENT + ); + DiscoveryNode node2 = new DiscoveryNode( + "node2", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), + Version.CURRENT + ); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).build()) + .build(); + + NodesStatsResponse nodesStatsResponse = createNodesStatsResponse( + List.of(node1, node2), + clusterState.getClusterName(), + 100, + 90, + 100 + ); + // throws no error + // validator.validateDiskThresholdWaterMarkNotBreached(clusterState, nodesStatsResponse, new ByteSizeValue(20)); + } + + public void testValidateDiskThresholdWaterMarkNotBreachedThrowsError() { + DiscoveryNode node1 = new DiscoveryNode( + "node1", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), + Version.CURRENT + ); + DiscoveryNode node2 = new DiscoveryNode( + "node2", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), + Version.CURRENT + ); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).build()) + .build(); + + NodesStatsResponse nodesStatsResponse = createNodesStatsResponse(List.of(node1, node2), clusterState.getClusterName(), 100, 90, 20); + // throws error + // expectThrows(IllegalArgumentException.class, () -> validator.validateDiskThresholdWaterMarkNotBreached(clusterState, + // nodesStatsResponse, new ByteSizeValue(100))); + } + + // public void testValidateEligibleNodesCapacity() { + // String indexUuid = UUID.randomUUID().toString(); + // String indexName = "test_index"; + // + // DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), + // Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), Version.CURRENT); + // DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), + // Collections.emptyMap(), Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), Version.CURRENT); + // ClusterState clusterState = ClusterState.builder(buildClusterState(indexName, indexUuid, Settings.EMPTY)) + // .nodes(DiscoveryNodes.builder() + // .add(node1) + // .add(node2) + // .build()) + // .build(); + // + // NodesStatsResponse nodesStatsResponse = createNodesStatsResponse(List.of(node1, node2), clusterState.getClusterName(), 100, 90, 100); + // Index index = new Index(indexName, indexUuid); + // // Setting shard per size as 10 b + // long sizePerShard = 10; + // IndicesStatsResponse indicesStatsResponse = createIndicesStatsResponse(index, clusterState, sizePerShard); + // Index[] indices = new Index[]{index}; + // List acceptedIndices = validator.validateEligibleNodesCapacity(nodesStatsResponse, indicesStatsResponse, clusterState, + // indices, 0); + // + // assertEquals(indices.length, acceptedIndices.size()); + // assertEquals(Arrays.asList(indices), acceptedIndices); + // + // nodesStatsResponse = createNodesStatsResponse(List.of(node1, node2), clusterState.getClusterName(), 100, 90, 100); + // // Setting shard per size as 10 mb + // sizePerShard = 10 * 1024; + // indicesStatsResponse = createIndicesStatsResponse(index, clusterState, sizePerShard); + // acceptedIndices = validator.validateEligibleNodesCapacity(nodesStatsResponse, indicesStatsResponse, clusterState, indices, 0); + // + // assertEquals(0, acceptedIndices.size()); + // } + + public void testGetTotalAvailableBytesInWarmTier() { + DiscoveryNode node1 = new DiscoveryNode( + "node1", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), + Version.CURRENT + ); + DiscoveryNode node2 = new DiscoveryNode( + "node2", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), + Version.CURRENT + ); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder().add(node1).add(node2).build()) + .build(); + NodesStatsResponse nodesStatsResponse = createNodesStatsResponse( + List.of(node1, node2), + clusterState.getClusterName(), + 100, + 90, + 100 + ); + // assertEquals(200, validator.getTotalAvailableBytesInWarmTier(nodesStatsResponse, Set.of(node1, node2), 0)); + } + + public void testEligibleNodes() { + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(2, 0, 0)) + .build(); + + assertEquals(2, validator.getEligibleNodes(clusterState).size()); + + clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(0, 1, 1)) + .build(); + assertEquals(0, validator.getEligibleNodes(clusterState).size()); + } + + private static ClusterState buildClusterState(String indexName, String indexUuid, Settings settings) { + return buildClusterState(indexName, indexUuid, settings, IndexMetadata.State.OPEN); + } + + private static ClusterState buildClusterState(String indexName, String indexUuid, Settings settings, IndexMetadata.State state) { + Settings combinedSettings = Settings.builder().put(settings).put(createDefaultIndexSettings(indexUuid)).build(); + + Metadata metadata = Metadata.builder().put(IndexMetadata.builder(indexName).settings(combinedSettings).state(state)).build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + } + + private static Settings createDefaultIndexSettings(String indexUuid) { + return Settings.builder() + .put("index.version.created", Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, indexUuid) + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 2) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .build(); + } + + private DiscoveryNodes createNodes(int numOfSearchNodes, int numOfDataNodes, int numOfIngestNodes) { + DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < numOfSearchNodes; i++) { + discoveryNodesBuilder.add( + new DiscoveryNode( + "node-s" + i, + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), + Version.CURRENT + ) + ); + } + for (int i = 0; i < numOfDataNodes; i++) { + discoveryNodesBuilder.add( + new DiscoveryNode( + "node-d" + i, + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ) + ); + } + for (int i = 0; i < numOfIngestNodes; i++) { + discoveryNodesBuilder.add( + new DiscoveryNode( + "node-i" + i, + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.INGEST_ROLE), + Version.CURRENT + ) + ); + } + return discoveryNodesBuilder.build(); + } + + private ShardStats[] createShardStats(ShardRouting[] shardRoutings, long sizePerShard, Index index) { + ShardStats[] shardStats = new ShardStats[shardRoutings.length]; + for (int i = 1; i <= shardRoutings.length; i++) { + ShardRouting shardRouting = shardRoutings[i - 1]; + Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardRouting.id())); + CommonStats commonStats = new CommonStats(); + commonStats.store = new StoreStats(sizePerShard, 0); + shardStats[i - 1] = new ShardStats( + shardRouting, + new ShardPath(false, path, path, shardRouting.shardId()), + commonStats, + null, + null, + null + ); + } + return shardStats; + } + + private IndicesStatsResponse createIndicesStatsResponse(Index index, ClusterState clusterState, long sizePerShard) { + ShardRouting[] shardRoutings = clusterState.routingTable() + .allShards(index.getName()) + .stream() + .filter(ShardRouting::primary) + .toArray(ShardRouting[]::new); + + ShardStats[] shardStats = createShardStats(shardRoutings, sizePerShard, index); + return IndicesStatsTests.newIndicesStatsResponse(shardStats, shardStats.length, shardStats.length, 0, emptyList()); + } + + private NodesStatsResponse createNodesStatsResponse( + List nodes, + ClusterName clusterName, + long total, + long free, + long available + ) { + FsInfo fsInfo = new FsInfo(0, null, new FsInfo.Path[] { new FsInfo.Path("/path", "/dev/sda", total, free, available) }); + List nodeStats = new ArrayList<>(); + for (DiscoveryNode node : nodes) { + nodeStats.add( + new NodeStats( + node, + 0, + null, + null, + null, + null, + null, + fsInfo, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ) + ); + } + return new NodesStatsResponse(clusterName, nodeStats, new ArrayList<>()); + } +}