From 5569b43b65eef2b2b512ed9f43044489b9089985 Mon Sep 17 00:00:00 2001 From: Neetika Singhal Date: Thu, 16 May 2024 16:12:31 -0700 Subject: [PATCH] Add rest, transport layer changes for Hot to warm tiering - dedicated setup Signed-off-by: Neetika Singhal --- .../org/opensearch/action/ActionModule.java | 9 + .../tiering/HotToWarmTieringAction.java | 26 ++ .../HotToWarmTieringRequestContext.java | 233 ++++++++++++ .../tiering/HotToWarmTieringResponse.java | 140 ++++++++ .../action/tiering/RestWarmTieringAction.java | 57 +++ .../action/tiering/TieringIndexRequest.java | 175 +++++++++ .../TransportHotToWarmTieringAction.java | 128 +++++++ .../action/tiering/package-info.java | 36 ++ .../cluster/routing/RoutingPool.java | 5 +- .../org/opensearch/index/IndexModule.java | 20 ++ .../tiering/TieringServiceValidator.java | 332 ++++++++++++++++++ .../org/opensearch/tiering/package-info.java | 36 ++ .../HotToWarmTieringResponseTests.java | 101 ++++++ .../action/tiering/TieringRequestTests.java | 85 +++++ .../tiering/TieringRequestValidatorTests.java | 312 ++++++++++++++++ 15 files changed, 1694 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringRequestContext.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringResponse.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/TransportHotToWarmTieringAction.java create mode 100644 server/src/main/java/org/opensearch/action/tiering/package-info.java create mode 100644 server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java create mode 100644 server/src/main/java/org/opensearch/tiering/package-info.java create mode 100644 server/src/test/java/org/opensearch/rest/action/tiering/HotToWarmTieringResponseTests.java create mode 100644 server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java create mode 100644 server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 16c15f553951c..0fd5e9aa74707 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -288,6 +288,9 @@ import org.opensearch.action.termvectors.TransportMultiTermVectorsAction; import org.opensearch.action.termvectors.TransportShardMultiTermsVectorAction; import org.opensearch.action.termvectors.TransportTermVectorsAction; +import org.opensearch.action.tiering.HotToWarmTieringAction; +import org.opensearch.action.tiering.RestWarmTieringAction; +import org.opensearch.action.tiering.TransportHotToWarmTieringAction; import org.opensearch.action.update.TransportUpdateAction; import org.opensearch.action.update.UpdateAction; import org.opensearch.client.node.NodeClient; @@ -634,6 +637,9 @@ public void reg actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class); actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class); actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); + if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + actions.register(HotToWarmTieringAction.INSTANCE, TransportHotToWarmTieringAction.class); + } actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class); @@ -966,6 +972,9 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestNodeAttrsAction()); registerHandler.accept(new RestRepositoriesAction()); registerHandler.accept(new RestSnapshotAction()); + if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + registerHandler.accept(new RestWarmTieringAction()); + } registerHandler.accept(new RestTemplatesAction()); // Point in time API diff --git a/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringAction.java b/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringAction.java new file mode 100644 index 0000000000000..1457578ff657a --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringAction.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.opensearch.action.ActionType; + +/** + * Tiering action to move indices from hot to warm + * + * @opensearch.experimental + */ +public class HotToWarmTieringAction extends ActionType { + + public static final HotToWarmTieringAction INSTANCE = new HotToWarmTieringAction(); + public static final String NAME = "indices:admin/tier/hot_to_warm"; + + public HotToWarmTieringAction() { + super(NAME, HotToWarmTieringResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringRequestContext.java b/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringRequestContext.java new file mode 100644 index 0000000000000..7ef07aa750fd2 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringRequestContext.java @@ -0,0 +1,233 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.opensearch.common.UUIDs; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Context class to hold indices to be tiered per request. It also holds + * the listener per request to mark the request as complete once all + * tiering operations are completed. + * + * @opensearch.experimental + */ + +public class HotToWarmTieringRequestContext { + private Set concreteIndices; + private Set acceptedIndices; + private Map rejectedIndices; + private Set notFoundIndices; + private Set inProgressIndices; + private Set successfulIndices; + private Map failedIndices; + private ActionListener actionListener; + private TieringIndexRequest request; + private String requestUuid; + + public HotToWarmTieringRequestContext() { + this.acceptedIndices = new HashSet<>(); + this.rejectedIndices = new HashMap<>(); + this.notFoundIndices = new HashSet<>(); + this.successfulIndices = new HashSet<>(); + this.failedIndices = new HashMap<>(); + this.inProgressIndices = new HashSet<>(); + this.requestUuid = UUIDs.randomBase64UUID(); + } + + public HotToWarmTieringRequestContext(ActionListener actionListener, TieringIndexRequest request) { + this(); + this.actionListener = actionListener; + this.request = request; + } + + public Set getConcreteIndices() { + return concreteIndices; + } + + public void setConcreteIndices(Set concreteIndices) { + this.concreteIndices = concreteIndices; + } + + public Set getAcceptedIndices() { + return acceptedIndices; + } + + public void setAcceptedIndices(Set acceptedIndices) { + this.acceptedIndices = acceptedIndices; + } + + public void setAcceptedAndInProgressIndices(Set acceptedIndices) { + this.acceptedIndices = acceptedIndices; + this.inProgressIndices.addAll(acceptedIndices); + } + + public Map getRejectedIndices() { + return rejectedIndices; + } + + public Set getNotFoundIndices() { + return notFoundIndices; + } + + public void setNotFoundIndices(Set notFoundIndices) { + this.notFoundIndices = notFoundIndices; + } + + public void setRejectedIndices(Map rejectedIndices) { + this.rejectedIndices = rejectedIndices; + } + + public void addToNotFound(String indexName) { + notFoundIndices.add(indexName); + } + + /** + * Method to move indices from success list to failed list with corresponding reasons to be sent in response + * @param indicesToFail - sets of indices that failed validation with same reason + * @param failureReason - reason for not accepting migration for this index + */ + public void addToRejected(Set indicesToFail, String failureReason) { + for (String index : indicesToFail) { + addToRejected(index, failureReason); + } + } + + /** + * Method to move index from success list to failed list with corresponding reasons to be sent in response + * @param indexName - indexName that failed validation + * @param failureReason - reason for not accepting migration for this index + */ + public void addToRejected(String indexName, String failureReason) { + rejectedIndices.put(indexName, failureReason); + } + + public void addToAccepted(Index index) { + acceptedIndices.add(index); + } + + public void addToAccepted(Set indices) { + acceptedIndices.addAll(indices); + } + + public Set getSuccessfulIndices() { + return successfulIndices; + } + + public void setSuccessfulIndices(Set successfulIndices) { + this.successfulIndices = successfulIndices; + } + + public void addToSuccessful(String index) { + successfulIndices.add(index); + } + + public Map getFailedIndices() { + return failedIndices; + } + + public void setFailedIndices(Map failedIndices) { + this.failedIndices = failedIndices; + } + + public void addToFailed(String index, String reason) { + failedIndices.put(index, reason); + } + + public ActionListener getListener() { + return actionListener; + } + + public void setActionListener(ActionListener actionListener) { + this.actionListener = actionListener; + } + + public TieringIndexRequest getRequest() { + return request; + } + + public void setRequest(TieringIndexRequest request) { + this.request = request; + } + + public String getRequestUuid() { + return requestUuid; + } + + public void setRequestUuid(String requestUuid) { + this.requestUuid = requestUuid; + } + + public Set getInProgressIndices() { + return inProgressIndices; + } + + public void setInProgressIndices(Set inProgressIndices) { + this.inProgressIndices = inProgressIndices; + } + + public void addToInProgress(Index index) { + inProgressIndices.add(index); + } + + public void removeFromInProgress(Index index) { + inProgressIndices.remove(index); + } + + public boolean isRequestProcessingComplete() { + return inProgressIndices.isEmpty(); + } + + public HotToWarmTieringResponse constructResponse() { + final List indicesResult = new LinkedList<>(); + for (Map.Entry rejectedIndex : rejectedIndices.entrySet()) { + indicesResult.add(new HotToWarmTieringResponse.IndexResult(rejectedIndex.getKey(), rejectedIndex.getValue())); + } + for (String index : notFoundIndices) { + indicesResult.add(new HotToWarmTieringResponse.IndexResult(index, "Index not found")); + } + for (Map.Entry failedIndex : failedIndices.entrySet()) { + indicesResult.add(new HotToWarmTieringResponse.IndexResult(failedIndex.getKey(), failedIndex.getValue())); + } + return new HotToWarmTieringResponse(acceptedIndices.size() > 0, indicesResult); + } + + @Override + public String toString() { + return "HotToWarmTieringRequestContext{" + + "acceptedIndices=" + + acceptedIndices + + ", rejectedIndices=" + + rejectedIndices + + ", notFoundIndices=" + + notFoundIndices + + ", inProgressIndices=" + + inProgressIndices + + ", successfulIndices=" + + successfulIndices + + ", failedIndices=" + + failedIndices + + ", actionListener=" + + actionListener + + ", request=" + + request + + ", requestUuid='" + + requestUuid + + '\'' + + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringResponse.java b/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringResponse.java new file mode 100644 index 0000000000000..79cf634e1b6d4 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/HotToWarmTieringResponse.java @@ -0,0 +1,140 @@ +package org.opensearch.action.tiering; + +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + +/** + * Response object for an {@link TieringIndexRequest} which is sent to client after the initial verification of the request + * by the backend service. The format of the response object will be as below: + * + * { + * "acknowledged": true/false, + * "failed_indices": [ + * { + * "index": "index1", + * "error": "Low disk threshold watermark breached" + * }, + * { + * "index": "index2", + * "error": "Index is not a remote store backed index" + * } + * ] + * } + */ +public class HotToWarmTieringResponse extends AcknowledgedResponse { + + private List failedIndices; + + public HotToWarmTieringResponse(boolean acknowledged, List indicesResults) { + super(acknowledged); + this.failedIndices = (indicesResults == null) ? new LinkedList<>() : indicesResults; + } + + public HotToWarmTieringResponse(StreamInput in) throws IOException { + super(in); + failedIndices = Collections.unmodifiableList(in.readList(IndexResult::new)); + } + + public HotToWarmTieringResponse failedIndices(List failedIndices) { + this.failedIndices = failedIndices; + return this; + } + + public List getFailedIndices() { + return this.failedIndices; + } + + public void addIndexResult(String indexName, String failureReason) { + failedIndices.add(new IndexResult(indexName, failureReason)); + } + + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(this.failedIndices); + } + + protected void addCustomFields(XContentBuilder builder, Params params) throws IOException { + super.addCustomFields(builder, params); + builder.startArray("failed_indices"); + + for (IndexResult failedIndex : failedIndices) { + failedIndex.toXContent(builder, params); + } + builder.endArray(); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + + /** + * Inner class to represent the result of a failed index for tiering. + */ + public static class IndexResult implements Writeable, ToXContentFragment { + private final String index; + private final String failureReason; + + public IndexResult(String index, String failureReason) { + this.index = index; + this.failureReason = failureReason; + } + + IndexResult(StreamInput in) throws IOException { + this.index = in.readString(); + this.failureReason = in.readString(); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeString(failureReason); + } + + public String getIndex() { + return index; + } + + public String getFailureReason() { + return failureReason; + } + + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("index", index); + builder.field("error", failureReason); + return builder.endObject(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IndexResult that = (IndexResult) o; + return Objects.equals(index, that.index) && Objects.equals(failureReason, that.failureReason); + } + + @Override + public int hashCode() { + int result = Objects.hashCode(index); + result = 31 * result + Objects.hashCode(failureReason); + return result; + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java b/server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java new file mode 100644 index 0000000000000..ce648758597c7 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/RestWarmTieringAction.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.core.common.Strings.splitStringByCommaToArray; +import static org.opensearch.rest.RestRequest.Method.POST; + +/** + * Rest Tiering API action to move indices to warm tier + * + * @opensearch.experimental + */ +public class RestWarmTieringAction extends BaseRestHandler { + + private static final String TARGET_TIER = "warm"; + + @Override + public List routes() { + return singletonList(new RestHandler.Route(POST, "/{index}/_tier/" + TARGET_TIER)); + } + + @Override + public String getName() { + return "warm_tiering_action"; + } + + @Override + protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + final TieringIndexRequest tieringIndexRequest = new TieringIndexRequest( + TARGET_TIER, + splitStringByCommaToArray(request.param("index")) + ); + tieringIndexRequest.timeout(request.paramAsTime("timeout", tieringIndexRequest.timeout())); + tieringIndexRequest.clusterManagerNodeTimeout( + request.paramAsTime("cluster_manager_timeout", tieringIndexRequest.clusterManagerNodeTimeout()) + ); + tieringIndexRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false)); + return channel -> client.admin() + .cluster() + .execute(HotToWarmTieringAction.INSTANCE, tieringIndexRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java b/server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java new file mode 100644 index 0000000000000..2294bd4a7fc9f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/TieringIndexRequest.java @@ -0,0 +1,175 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.IndicesRequest; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.master.AcknowledgedRequest; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; + +import static org.opensearch.action.ValidateActions.addValidationError; + +/** + * Represents the tiering request for indices to move to a different tier + * + * @opensearch.experimental + */ +@ExperimentalApi +public class TieringIndexRequest extends AcknowledgedRequest implements IndicesRequest.Replaceable { + + private String[] indices; + private Tier targetTier; + private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, false); + private boolean waitForCompletion; + + public TieringIndexRequest() {} + + public TieringIndexRequest(String targetTier, String... indices) { + this.targetTier = Tier.fromString(targetTier); + this.indices = indices; + } + + public TieringIndexRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + targetTier = Tier.fromString(in.readString()); + indicesOptions = IndicesOptions.readIndicesOptions(in); + waitForCompletion = in.readBoolean(); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (indices == null || indices.length == 0) { + validationException = addValidationError("Mandatory parameter - indices is missing from the request", validationException); + } + if (targetTier == null) { + validationException = addValidationError("Mandatory parameter - tier is missing from the request", validationException); + } + if (!Tier.WARM.equals(targetTier)) { + validationException = addValidationError("The specified tier is not supported", validationException); + } + return validationException; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + out.writeString(targetTier.value()); + indicesOptions.writeIndicesOptions(out); + out.writeBoolean(waitForCompletion); + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + @Override + public TieringIndexRequest indices(String... indices) { + this.indices = indices; + return this; + } + + public TieringIndexRequest indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + /** + * If this parameter is set to true the operation will wait for completion of tiering process before returning. + * + * @param waitForCompletion if true the operation will wait for completion + * @return this request + */ + public TieringIndexRequest waitForCompletion(boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + return this; + } + + /** + * Returns wait for completion setting + * + * @return true if the operation will wait for completion + */ + public boolean waitForCompletion() { + return waitForCompletion; + } + + public Tier tier() { + return targetTier; + } + + public TieringIndexRequest tier(Tier tier) { + this.targetTier = tier; + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TieringIndexRequest that = (TieringIndexRequest) o; + return clusterManagerNodeTimeout.equals(that.clusterManagerNodeTimeout) + && timeout.equals(that.timeout) + && Objects.equals(indicesOptions, that.indicesOptions) + && Arrays.equals(indices, that.indices) + && targetTier.equals(that.targetTier) + && waitForCompletion == that.waitForCompletion; + } + + @Override + public int hashCode() { + return Objects.hash(clusterManagerNodeTimeout, timeout, indicesOptions, waitForCompletion, Arrays.hashCode(indices)); + } + + /** + * Represents the supported tiers for an index + * + * @opensearch.experimental + */ + @ExperimentalApi + public enum Tier { + HOT, + WARM; + + public static Tier fromString(String name) { + String upperCase = name.trim().toUpperCase(Locale.ROOT); + if (HOT.name().equals(upperCase)) { + return HOT; + } + if (WARM.name().equals(upperCase)) { + return WARM; + } + throw new IllegalArgumentException("Tiering type [" + name + "] is not supported. Supported types are " + HOT + " and " + WARM); + } + + public String value() { + return name().toLowerCase(Locale.ROOT); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/TransportHotToWarmTieringAction.java b/server/src/main/java/org/opensearch/action/tiering/TransportHotToWarmTieringAction.java new file mode 100644 index 0000000000000..431e984c283fc --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/TransportHotToWarmTieringAction.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.opensearch.tiering.TieringServiceValidator.validateHotToWarm; + +/** + * Transport Tiering action to move indices from hot to warm + * + * @opensearch.experimental + */ +public class TransportHotToWarmTieringAction extends TransportClusterManagerNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportHotToWarmTieringAction.class); + private final ClusterInfoService clusterInfoService; + private final DiskThresholdSettings diskThresholdSettings; + + @Inject + public TransportHotToWarmTieringAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterInfoService clusterInfoService, + Settings settings + ) { + super( + HotToWarmTieringAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + TieringIndexRequest::new, + indexNameExpressionResolver + ); + this.clusterInfoService = clusterInfoService; + this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterService.getClusterSettings());; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected HotToWarmTieringResponse read(StreamInput in) throws IOException { + return new HotToWarmTieringResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(TieringIndexRequest request, ClusterState state) { + ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + if (blockException == null) { + for (String index : request.indices()) { + try { + blockException = state.blocks() + .indicesBlockedException( + ClusterBlockLevel.METADATA_WRITE, + indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), true, index) + ); + } catch (IndexNotFoundException e) { + logger.debug("Index [{}] not found: {}", index, e); + throw e; + } + } + } + return blockException; + } + + @Override + protected void clusterManagerOperation( + TieringIndexRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + final HotToWarmTieringRequestContext hotToWarmTieringRequestContext = new HotToWarmTieringRequestContext(listener, request); + final Set concreteIndices = new HashSet<>(); + for (String index : request.indices()) { + final Index[] resolvedIndices = indexNameExpressionResolver.concreteIndices(state, request.indicesOptions(), true, index); + concreteIndices.addAll(Set.of(resolvedIndices)); + } + hotToWarmTieringRequestContext.setConcreteIndices(concreteIndices); + final HotToWarmTieringRequestContext validationResult = validateHotToWarm( + state, + concreteIndices.toArray(Index.EMPTY_ARRAY), + clusterInfoService.getClusterInfo(), + diskThresholdSettings + ); + hotToWarmTieringRequestContext.setAcceptedAndInProgressIndices(validationResult.getAcceptedIndices()); + hotToWarmTieringRequestContext.setRejectedIndices(validationResult.getRejectedIndices()); + + if (hotToWarmTieringRequestContext.getAcceptedIndices().isEmpty()) { + listener.onResponse(hotToWarmTieringRequestContext.constructResponse()); + return; + } + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/package-info.java b/server/src/main/java/org/opensearch/action/tiering/package-info.java new file mode 100644 index 0000000000000..225fe312bf189 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/tiering/package-info.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Actions that OpenSearch tiering can take to tier the indices + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.tiering; diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java index db10ad61c7d6d..470533bf14fb6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java @@ -11,6 +11,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.index.IndexModule; /** * {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods @@ -58,6 +59,8 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all * @return {@link RoutingPool} for the given index. */ public static RoutingPool getIndexPool(IndexMetadata indexMetadata) { - return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY; + return indexMetadata.isRemoteSnapshot() + || IndexModule.DataLocalityType.PARTIAL.name() + .equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())) ? REMOTE_CAPABLE : LOCAL_ONLY; } } diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 09b904394ee09..93ff1b78b1ac5 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -48,6 +48,7 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.SetOnce; import org.opensearch.common.TriFunction; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.settings.Setting; @@ -174,6 +175,14 @@ public final class IndexModule { Property.NodeScope ); + public static final Setting INDEX_TIERING_STATE = new Setting<>( + "index.tiering.state", + TieringState.HOT.name(), + Function.identity(), + Property.IndexScope, + Property.PrivateIndex + ); + /** Which lucene file extensions to load with the mmap directory when using hybridfs store. This settings is ignored if {@link #INDEX_STORE_HYBRID_NIO_EXTENSIONS} is set. * This is an expert setting. * @see Lucene File Extensions. @@ -663,6 +672,17 @@ public static Type defaultStoreType(final boolean allowMmap) { } } + /** + * Represents the tiering state of the index. + */ + @ExperimentalApi + public enum TieringState { + HOT, + HOT_TO_WARM, + WARM, + WARM_TO_HOT; + } + public IndexService newIndexService( IndexService.IndexCreationContext indexCreationContext, NodeEnvironment environment, diff --git a/server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java b/server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java new file mode 100644 index 0000000000000..bd6096dc91093 --- /dev/null +++ b/server/src/main/java/org/opensearch/tiering/TieringServiceValidator.java @@ -0,0 +1,332 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.tiering.HotToWarmTieringRequestContext; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiskUsage; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.health.ClusterIndexHealth; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.common.collect.Tuple; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +/** + * Validator class to validate the tiering requests of the index + * @opensearch.experimental + */ +public class TieringServiceValidator { + + private static final Logger logger = LogManager.getLogger(TieringServiceValidator.class); + + /** + * Validates the tiering request for indices going from hot to warm tier + * + * @param currentState current cluster state + * @param indices array of indices to be validated + * @param clusterInfo the current nodes usage info for the cluster + * @param diskThresholdSettings the disk threshold settings of the cluster + * @return array of indices that are accepted for tiering + */ + public static HotToWarmTieringRequestContext validateHotToWarm( + final ClusterState currentState, + final Index[] indices, + final ClusterInfo clusterInfo, + final DiskThresholdSettings diskThresholdSettings + ) { + final String indexNames = Arrays.stream(indices).map(Index::getName).collect(Collectors.joining(", ")); + validateSearchNodes(currentState, indexNames); + validateDiskThresholdWaterMarkNotBreached(currentState, clusterInfo, diskThresholdSettings); + + final Set notHotIndices = new HashSet<>(); + final Set notRemoteStoreBacked = new HashSet<>(); + final Set closedIndices = new HashSet<>(); + final Set redIndices = new HashSet<>(); + final Set acceptedIndices = new HashSet<>(); + + final HotToWarmTieringRequestContext hotToWarmTieringRequestContext = new HotToWarmTieringRequestContext(); + + for (Index index : indices) { + if (!validateHotIndex(currentState, index)) { + notHotIndices.add(index.getName()); + continue; + } + if (!validateRemoteStoreIndex(currentState, index)) { + notRemoteStoreBacked.add(index.getName()); + continue; + } + if (!validateOpenIndex(currentState, index)) { + closedIndices.add(index.getName()); + continue; + } + if (!validateIndexHealth(currentState, index)) { + redIndices.add(index.getName()); + continue; + } + acceptedIndices.add(index); + } + + if (!notHotIndices.isEmpty()) { + logger.warn( + "Rejecting tiering request for indices [{}] because they are already in the warm tier or the tiering is currently in progress.", + notHotIndices + ); + hotToWarmTieringRequestContext.addToRejected( + notHotIndices, + "index is already in the warm tier or the tiering is currently in progress for the index" + ); + } + if (!notRemoteStoreBacked.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because they are not remote store enabled.", notRemoteStoreBacked); + hotToWarmTieringRequestContext.addToRejected(notRemoteStoreBacked, "index is not backed up by the remote store"); + } + if (!closedIndices.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because they are closed.", closedIndices); + hotToWarmTieringRequestContext.addToRejected(closedIndices, "index is closed"); + } + if (!redIndices.isEmpty()) { + logger.warn("Rejecting tiering request for indices [{}] because they are red.", redIndices); + hotToWarmTieringRequestContext.addToRejected(redIndices, "index is red"); + } + + Tuple, Set> eligibleNodesCapacity = validateEligibleNodesCapacity(clusterInfo, currentState, indices, 0); + hotToWarmTieringRequestContext.addToAccepted(eligibleNodesCapacity.v1()); + hotToWarmTieringRequestContext.addToRejected(eligibleNodesCapacity.v2(), "insufficient node capacity"); + logger.info("Successfully accepted indices for tiering are [{}]", acceptedIndices); + + return hotToWarmTieringRequestContext; + } + + /** + * Validates that there are eligible nodes with the search role in the current cluster state. + * (only for the dedicated case - to be removed later) + * + * @param currentState the current cluster state + * @param indexNames the names of the indices being validated + * @throws IllegalArgumentException if there are no eligible search nodes in the cluster + */ + public static void validateSearchNodes(final ClusterState currentState, final String indexNames) { + if (getEligibleNodes(currentState).isEmpty()) { + final String errorMsg = "Rejecting tiering request for indices [" + + indexNames + + "] because there are no nodes found with the search role"; + logger.warn(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + } + + /** + * Validates that the specified index has the remote store setting enabled. + * + * @param state the current cluster state + * @param index the index to be validated + * @return true if the remote store setting is enabled for the index, false otherwise + */ + public static boolean validateRemoteStoreIndex(final ClusterState state, final Index index) { + return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(state.metadata().getIndexSafe(index).getSettings()); + } + + /** + * Validates that the specified index is in the "hot" tiering state. + * + * @param state the current cluster state + * @param index the index to be validated + * @return true if the index is in the "hot" tiering state, false otherwise + */ + public static boolean validateHotIndex(final ClusterState state, final Index index) { + return IndexModule.TieringState.HOT.name().equals(INDEX_TIERING_STATE.get(state.metadata().getIndexSafe(index).getSettings())); + } + + /** + * Validates the health of the specified index in the current cluster state. + * + * @param currentState the current cluster state + * @param index the index to be validated + * @return true if the index health is not in the "red" state, false otherwise + */ + public static boolean validateIndexHealth(final ClusterState currentState, final Index index) { + final IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index); + final IndexMetadata indexMetadata = currentState.metadata().index(index); + final ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable); + return !ClusterHealthStatus.RED.equals(indexHealth.getStatus()); + } + + /** + * Validates that the specified index is in the open state in the current cluster state. + * + * @param currentState the current cluster state + * @param index the index to be validated + * @return true if the index is in the open state, false otherwise + */ + public static boolean validateOpenIndex(final ClusterState currentState, final Index index) { + return currentState.metadata().index(index).getState() == IndexMetadata.State.OPEN; + } + + /** + * Validates that the disk threshold low watermark is not breached on all the eligible nodes in the cluster. + * + * @param currentState the current cluster state + * @param clusterInfo the current nodes usage info for the cluster + * @param diskThresholdSettings the disk threshold settings of the cluster + * @throws IllegalArgumentException if the disk threshold low watermark is breached on all eligible nodes + */ + public static void validateDiskThresholdWaterMarkNotBreached( + final ClusterState currentState, + final ClusterInfo clusterInfo, + final DiskThresholdSettings diskThresholdSettings + ) { + final Map usages = clusterInfo.getNodeLeastAvailableDiskUsages(); + if (usages == null) { + logger.trace("skipping monitor as no disk usage information is available"); + return; + } + final Set nodeIds = getEligibleNodes(currentState).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + int count = 0; + for (String node : nodeIds) { + final DiskUsage diskUsage = usages.get(node); + if (diskUsage != null && diskUsage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) { + logger.warn("Disk threshold low watermark breached on the node [{}]", node); + count++; + } + } + if (count == nodeIds.size()) { + throw new IllegalArgumentException( + "Disk threshold low watermark breached on all the search nodes, hence no new index can be accepted for tiering " + ); + } + } + + /** + * Validates the capacity of eligible nodes in the cluster to accommodate the specified indices. + * + * @param clusterInfo the current nodes usage info for the cluster + * @param currentState the current cluster state + * @param indices the indices to be validated + * @param bufferSpace the amount of buffer space to reserve on each node + * @return a list of indices that can be accommodated by the eligible nodes + */ + public static Tuple, Set> validateEligibleNodesCapacity( + ClusterInfo clusterInfo, + final ClusterState currentState, + final Index[] indices, + final long bufferSpace + ) { + + final Set eligibleNodeIds = getEligibleNodes(currentState).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + long totalAvailableBytesInWarmTier = getTotalAvailableBytesInWarmTier( + clusterInfo.getNodeLeastAvailableDiskUsages(), + eligibleNodeIds, + bufferSpace + ); + + Map indexSizes = new HashMap<>(); + for (Index index : indices) { + indexSizes.put(index, getTotalIndexSize(currentState, clusterInfo, index.getName())); + } + + if (indexSizes.values().stream().mapToLong(Long::longValue).sum() < totalAvailableBytesInWarmTier) { + return new Tuple<>(Set.of(indices), Set.of()); + } + HashMap sortedIndexSizes = indexSizes.entrySet() + .stream() + .sorted(Map.Entry.comparingByValue()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, HashMap::new)); + + long requestIndexBytes = 0L; + Set acceptedIndices = new HashSet<>(); + Set rejectedIndices = new HashSet<>(); + for (Index index : sortedIndexSizes.keySet()) { + requestIndexBytes += sortedIndexSizes.get(index); + if (requestIndexBytes < totalAvailableBytesInWarmTier) { + acceptedIndices.add(index); + } else { + rejectedIndices.add(index.getName()); + } + } + if (!rejectedIndices.isEmpty()) { + logger.warn( + "Rejecting tiering request for indices [{}] because total available bytes is less than the " + "requested bytes.", + rejectedIndices + ); + } + return new Tuple<>(acceptedIndices, rejectedIndices); + } + + /** + * Calculates the total size of the specified index in the cluster. + * Note: This function only accounts for the primary shard size. + * + * @param clusterState the current state of the cluster + * @param clusterInfo the current nodes usage info for the cluster + * @param index the name of the index for which the total size is to be calculated + * @return the total size of the specified index in the cluster + */ + public static long getTotalIndexSize(ClusterState clusterState, ClusterInfo clusterInfo, String index) { + long totalIndexSize = 0; + List shardRoutings = clusterState.routingTable().allShards(index); + for (ShardRouting shardRouting : shardRoutings) { + if (shardRouting.primary()) { + totalIndexSize += clusterInfo.getShardSize(shardRouting, 0); + } + } + return totalIndexSize; + } + + /** + * Calculates the total available bytes in the warm tier of the cluster. + * + * @param usages the current disk usage of the cluster + * @param nodeIds the set of warm nodes ids in the cluster + * @param bufferSpace the amount of buffer space to reserve on each node + * @return the total available bytes in the warm tier + */ + public static long getTotalAvailableBytesInWarmTier( + final Map usages, + final Set nodeIds, + final long bufferSpace + ) { + long totalAvailableBytes = 0; + for (String node : nodeIds) { + final long totalBytesOnNode = usages.get(node).getTotalBytes(); + final long totalAvailBytesOnNode = usages.get(node).getFreeBytes(); + final long diskSpaceBufferBytes = Math.round(Math.min(bufferSpace, totalBytesOnNode * .2)); + totalAvailableBytes += Math.max(0, (totalAvailBytesOnNode - diskSpaceBufferBytes)); + } + return totalAvailableBytes; + } + + /** + * Retrieves the set of eligible(search) nodes from the current cluster state. + * + * @param currentState the current cluster state + * @return the set of eligible nodes + */ + public static Set getEligibleNodes(final ClusterState currentState) { + final Map nodes = currentState.getNodes().getDataNodes(); + return nodes.values().stream().filter(DiscoveryNode::isSearchNode).collect(Collectors.toSet()); + } +} diff --git a/server/src/main/java/org/opensearch/tiering/package-info.java b/server/src/main/java/org/opensearch/tiering/package-info.java new file mode 100644 index 0000000000000..7852fd0865bdd --- /dev/null +++ b/server/src/main/java/org/opensearch/tiering/package-info.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Validator layer checks that OpenSearch tiering can perform to tier the indices + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.tiering; diff --git a/server/src/test/java/org/opensearch/rest/action/tiering/HotToWarmTieringResponseTests.java b/server/src/test/java/org/opensearch/rest/action/tiering/HotToWarmTieringResponseTests.java new file mode 100644 index 0000000000000..b9f8c9d7119ae --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/tiering/HotToWarmTieringResponseTests.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.tiering; + +import org.opensearch.action.tiering.HotToWarmTieringResponse; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Test; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +public class HotToWarmTieringResponseTests extends OpenSearchTestCase { + + /** + * SerDe of {@link HotToWarmTieringResponse} over wire + * @throws IOException - in case of error + */ + @Test + public void testSerDeOfHotToWarmTiering() throws IOException { + HotToWarmTieringResponse randomResponse = randomHotToWarmTieringResponse(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + randomResponse.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + HotToWarmTieringResponse deserializedResponse = new HotToWarmTieringResponse(in); + assertEquals(deserializedResponse.getFailedIndices(), randomResponse.getFailedIndices()); + } + } + } + + /** + * Verifies that ToXContent works with any random {@link HotToWarmTieringResponse} object + * @throws Exception - in case of error + */ + @Test + public void testToXContentWorksForRandomResponse() throws Exception { + HotToWarmTieringResponse testResponse = randomHotToWarmTieringResponse(); + XContentType xContentType = randomFrom(XContentType.values()); + try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) { + testResponse.toXContent(builder, ToXContent.EMPTY_PARAMS); + } + } + + /** + * Verify the XContent output of the response object + * @throws Exception - in case of error + */ + @Test + public void testToXContentOutput() throws Exception { + String[] indices = new String[] { "index1", "index2" }; + String[] errorReasons = new String[] { "reason1", "reason2" }; + List results = new LinkedList<>(); + for (int i = 0; i < indices.length; ++i) { + results.add(new HotToWarmTieringResponse.IndexResult(indices[i], errorReasons[i])); + } + HotToWarmTieringResponse testResponse = new HotToWarmTieringResponse(true, results); + + // generate a corresponding expected xcontent + XContentBuilder content = XContentFactory.jsonBuilder().startObject().field("acknowledged", true).startArray("failed_indices"); + + for (HotToWarmTieringResponse.IndexResult result : results) { + content.startObject().field("index", result.getIndex()).field("error", result.getFailureReason()).endObject(); + } + content.endArray().endObject(); + assertEquals(content.toString(), testResponse.toString()); + + } + + /** + * @return - randomly generated object of type {@link HotToWarmTieringResponse.IndexResult} + */ + private HotToWarmTieringResponse.IndexResult randomIndexResult() { + String indexName = randomAlphaOfLengthBetween(1, 50); + String failureReason = randomAlphaOfLengthBetween(1, 200); + return new HotToWarmTieringResponse.IndexResult(indexName, failureReason); + } + + /** + * @return - randomly generated object of type {@link HotToWarmTieringResponse} + */ + private HotToWarmTieringResponse randomHotToWarmTieringResponse() { + int numIndexResult = randomIntBetween(0, 10); + List indexResults = new LinkedList<>(); + for (int i = 0; i < numIndexResult; ++i) { + indexResults.add(randomIndexResult()); + } + return new HotToWarmTieringResponse(randomBoolean(), indexResults); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java new file mode 100644 index 0000000000000..659fa894148e0 --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestTests.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.tiering; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.tiering.TieringIndexRequest; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class TieringRequestTests extends OpenSearchTestCase { + + public void testTieringRequestWithListOfIndices() { + TieringIndexRequest request = new TieringIndexRequest(); + request.indices("foo", "bar", "baz"); + request.tier(TieringIndexRequest.Tier.WARM); + request.indices("test") + .indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + ActionRequestValidationException validationException = request.validate(); + assertNull(validationException); + } + + public void testTieringRequestWithIndexPattern() { + TieringIndexRequest request = new TieringIndexRequest(); + request.indices("foo-*"); + request.tier(TieringIndexRequest.Tier.WARM); + ActionRequestValidationException validationException = request.validate(); + assertNull(validationException); + } + + public void testTieringRequestWithNullOrEmptyIndices() { + TieringIndexRequest request = new TieringIndexRequest(); + ActionRequestValidationException validationException = request.validate(); + assertNotNull(validationException); + request.indices(""); + validationException = request.validate(); + assertNotNull(validationException); + } + + public void testTieringRequestWithNullOrNotSupportedTier() { + TieringIndexRequest request = new TieringIndexRequest(); + request.indices("test"); + ActionRequestValidationException validationException = request.validate(); + assertNotNull(validationException); + request.tier(TieringIndexRequest.Tier.HOT); + validationException = request.validate(); + assertNotNull(validationException); + } + + public void testTieringTypeFromString() { + expectThrows(IllegalArgumentException.class, () -> TieringIndexRequest.Tier.fromString("tier")); + } + + public void testSerDeOfTieringRequest() throws IOException { + TieringIndexRequest request = new TieringIndexRequest("warm", "test"); + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + final TieringIndexRequest deserializedRequest = new TieringIndexRequest(in); + assertEquals(request, deserializedRequest); + } + } + } + + public void testTieringRequestEquals() { + final TieringIndexRequest original = new TieringIndexRequest("warm", "test"); + original.indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + final TieringIndexRequest expected = new TieringIndexRequest("warm", original.indices()); + expected.indicesOptions(original.indicesOptions()); + assertThat(expected, equalTo(original)); + assertThat(expected.indices(), equalTo(original.indices())); + assertThat(expected.indicesOptions(), equalTo(original.indicesOptions())); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java new file mode 100644 index 0000000000000..e7a8090328eb6 --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/tiering/TieringRequestValidatorTests.java @@ -0,0 +1,312 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.tiering; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiskUsage; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.tiering.TieringServiceValidator; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING; +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; +import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING; + +public class TieringRequestValidatorTests extends OpenSearchTestCase { + + private TieringServiceValidator validator = new TieringServiceValidator(); + + public void testValidateSearchNodes() { + ClusterState clusterStateWithSearchNodes = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(2, 0, 0)) + .build(); + + // throws no errors + validator.validateSearchNodes(clusterStateWithSearchNodes, "test_index"); + } + + public void testValidateSearchNodesThrowError() { + ClusterState clusterStateWithNoSearchNodes = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(0, 1, 1)) + .build(); + // throws error + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> validator.validateSearchNodes(clusterStateWithNoSearchNodes, "test") + ); + } + + public void testValidRemoteStoreIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + + ClusterState clusterState1 = buildClusterState( + indexName, + indexUuid, + Settings.builder() + .put(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .build() + ); + + assertTrue(validator.validateRemoteStoreIndex(clusterState1, new Index(indexName, indexUuid))); + } + + public void testNotRemoteStoreIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertFalse( + validator.validateRemoteStoreIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid)) + ); + } + + public void testValidHotIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertTrue(validator.validateHotIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); + } + + public void testNotHotIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + + IndexModule.TieringState tieringState = randomBoolean() ? IndexModule.TieringState.HOT_TO_WARM : IndexModule.TieringState.WARM; + + ClusterState clusterState = buildClusterState( + indexName, + indexUuid, + Settings.builder().put(IndexModule.INDEX_TIERING_STATE.getKey(), tieringState).build() + ); + assertFalse(validator.validateHotIndex(clusterState, new Index(indexName, indexUuid))); + } + + public void testValidateIndexHealth() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + ClusterState clusterState = buildClusterState(indexName, indexUuid, Settings.EMPTY); + assertTrue(validator.validateIndexHealth(clusterState, new Index(indexName, indexUuid))); + } + + public void testValidOpenIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertTrue(validator.validateOpenIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); + } + + public void testNotValidOpenIndex() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + assertFalse( + validator.validateOpenIndex( + buildClusterState(indexName, indexUuid, Settings.EMPTY, IndexMetadata.State.CLOSE), + new Index(indexName, indexUuid) + ) + ); + } + + public void testValidateDiskThresholdWaterMarkNotBreached() { + int noOfNodes = 2; + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(noOfNodes, 0, 0)) + .build(); + + ClusterInfo clusterInfo = clusterInfo(noOfNodes, 100, 20); + DiskThresholdSettings diskThresholdSettings = diskThresholdSettings("10b", "10b", "5b"); + // throws no error + validator.validateDiskThresholdWaterMarkNotBreached(clusterState, clusterInfo, diskThresholdSettings); + } + + public void testValidateDiskThresholdWaterMarkNotBreachedThrowsError() { + int noOfNodes = 2; + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(noOfNodes, 0, 0)) + .build(); + ClusterInfo clusterInfo = clusterInfo(noOfNodes, 100, 5); + DiskThresholdSettings diskThresholdSettings = diskThresholdSettings("10b", "10b", "5b"); + // throws error + expectThrows( + IllegalArgumentException.class, + () -> validator.validateDiskThresholdWaterMarkNotBreached(clusterState, clusterInfo, diskThresholdSettings) + ); + } + + public void testGetTotalIndexSize() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + ClusterState clusterState = ClusterState.builder(buildClusterState(indexName, indexUuid, Settings.EMPTY)) + .nodes(createNodes(1, 0, 0)) + .build(); + Map diskUsages = diskUsages(1, 100, 50); + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test_index][0][p]", 10L); // 10 bytes + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + assertEquals(10, validator.getTotalIndexSize(clusterState, clusterInfo, indexName)); + } + + public void testValidateEligibleNodesCapacityWithAllAccepted() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + Index[] indices = new Index[] { new Index(indexName, indexUuid) }; + ClusterState clusterState = ClusterState.builder(buildClusterState(indexName, indexUuid, Settings.EMPTY)) + .nodes(createNodes(1, 0, 0)) + .build(); + Map diskUsages = diskUsages(1, 100, 50); + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test_index][0][p]", 10L); // 10 bytes + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + Tuple, Set> result = validator.validateEligibleNodesCapacity(clusterInfo, clusterState, indices, 0); + assertEquals(indices.length, result.v1().size()); + assertTrue(result.v2().isEmpty()); + assertEquals(Set.of(indices), result.v1()); + } + + public void testValidateEligibleNodesCapacityWithAllRejected() { + String indexUuid = UUID.randomUUID().toString(); + String indexName = "test_index"; + Index[] indices = new Index[] { new Index(indexName, indexUuid) }; + ClusterState clusterState = ClusterState.builder(buildClusterState(indexName, indexUuid, Settings.EMPTY)) + .nodes(createNodes(1, 0, 0)) + .build(); + Map diskUsages = diskUsages(1, 100, 10); + final Map shardSizes = new HashMap<>(); + shardSizes.put("[test_index][0][p]", 20L); // 20 bytes + ClusterInfo clusterInfo = new ClusterInfo(diskUsages, null, shardSizes, null, Map.of(), Map.of()); + Tuple, Set> result = validator.validateEligibleNodesCapacity(clusterInfo, clusterState, indices, 0); + assertTrue(result.v1().isEmpty()); + assertEquals(indices.length, result.v2().size()); + assertEquals(Set.of(indexName), result.v2()); + } + + public void testGetTotalAvailableBytesInWarmTier() { + Map diskUsages = diskUsages(2, 500, 100); + assertEquals(200, validator.getTotalAvailableBytesInWarmTier(diskUsages, Set.of("node-s0", "node-s1"), 0)); + } + + public void testEligibleNodes() { + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(2, 0, 0)) + .build(); + + assertEquals(2, validator.getEligibleNodes(clusterState).size()); + + clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(createNodes(0, 1, 1)) + .build(); + assertEquals(0, validator.getEligibleNodes(clusterState).size()); + } + + private static ClusterState buildClusterState(String indexName, String indexUuid, Settings settings) { + return buildClusterState(indexName, indexUuid, settings, IndexMetadata.State.OPEN); + } + + private static ClusterState buildClusterState(String indexName, String indexUuid, Settings settings, IndexMetadata.State state) { + Settings combinedSettings = Settings.builder().put(settings).put(createDefaultIndexSettings(indexUuid)).build(); + + Metadata metadata = Metadata.builder().put(IndexMetadata.builder(indexName).settings(combinedSettings).state(state)).build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + + return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + } + + private static Settings createDefaultIndexSettings(String indexUuid) { + return Settings.builder() + .put("index.version.created", Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, indexUuid) + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 2) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .build(); + } + + private DiscoveryNodes createNodes(int numOfSearchNodes, int numOfDataNodes, int numOfIngestNodes) { + DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < numOfSearchNodes; i++) { + discoveryNodesBuilder.add( + new DiscoveryNode( + "node-s" + i, + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE), + Version.CURRENT + ) + ); + } + for (int i = 0; i < numOfDataNodes; i++) { + discoveryNodesBuilder.add( + new DiscoveryNode( + "node-d" + i, + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ) + ); + } + for (int i = 0; i < numOfIngestNodes; i++) { + discoveryNodesBuilder.add( + new DiscoveryNode( + "node-i" + i, + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.INGEST_ROLE), + Version.CURRENT + ) + ); + } + return discoveryNodesBuilder.build(); + } + + private static DiskThresholdSettings diskThresholdSettings(String low, String high, String flood) { + return new DiskThresholdSettings( + Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), low) + .put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), high) + .put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), flood) + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + } + + private static ClusterInfo clusterInfo(int noOfNodes, long totalBytes, long freeBytes) { + final Map diskUsages = diskUsages(noOfNodes, totalBytes, freeBytes); + return new ClusterInfo(diskUsages, null, null, null, Map.of(), Map.of()); + } + + private static Map diskUsages(int noOfSearchNodes, long totalBytes, long freeBytes) { + final Map diskUsages = new HashMap<>(); + for (int i = 0; i < noOfSearchNodes; i++) { + diskUsages.put("node-s" + i, new DiskUsage("node-s" + i, "node-s" + i, "/foo/bar", totalBytes, freeBytes)); + } + return diskUsages; + } +}