Skip to content

Commit

Permalink
Add rest, transport, service layer changes for Tiering
Browse files Browse the repository at this point in the history
Signed-off-by: Neetika Singhal <[email protected]>
  • Loading branch information
neetikasinghal committed Jun 24, 2024
1 parent 212efd7 commit 8099f62
Show file tree
Hide file tree
Showing 14 changed files with 2,284 additions and 1 deletion.
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@
import org.opensearch.action.termvectors.TransportMultiTermVectorsAction;
import org.opensearch.action.termvectors.TransportShardMultiTermsVectorAction;
import org.opensearch.action.termvectors.TransportTermVectorsAction;
import org.opensearch.action.tiering.RestWarmTieringAction;
import org.opensearch.action.tiering.TransportWarmTieringAction;
import org.opensearch.action.tiering.WarmTieringAction;
import org.opensearch.action.update.TransportUpdateAction;
import org.opensearch.action.update.UpdateAction;
import org.opensearch.client.node.NodeClient;
Expand Down Expand Up @@ -634,6 +637,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class);
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) {
actions.register(WarmTieringAction.INSTANCE, TransportWarmTieringAction.class);
}
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
Expand Down Expand Up @@ -966,6 +972,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestNodeAttrsAction());
registerHandler.accept(new RestRepositoriesAction());
registerHandler.accept(new RestSnapshotAction());
if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) {
registerHandler.accept(new RestWarmTieringAction());
}
registerHandler.accept(new RestTemplatesAction());

// Point in time API
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.tiering;

import org.opensearch.common.UUIDs;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class HotToWarmTieringRequestContext {
private Set<Index> acceptedIndices;
private Map<String, String> rejectedIndices;
private Set<String> notFoundIndices;
private Set<Index> inProgressIndices;
private Set<String> successfulIndices;
private Set<String> failedIndices;
private ActionListener<HotToWarmTieringResponse> actionListener;
private TieringIndexRequest request;
private String requestUuid;

public HotToWarmTieringRequestContext() {
this.acceptedIndices = new HashSet<>();
this.rejectedIndices = new HashMap<>();
this.notFoundIndices = new HashSet<>();
this.successfulIndices = new HashSet<>();
this.failedIndices = new HashSet<>();
this.inProgressIndices = new HashSet<>();
this.requestUuid = UUIDs.randomBase64UUID();
}

public HotToWarmTieringRequestContext(ActionListener<HotToWarmTieringResponse> actionListener, TieringIndexRequest request) {
this();
this.actionListener = actionListener;
this.request = request;
}

public HotToWarmTieringRequestContext(Set<Index> acceptedIndices, Map<String, String> rejectedIndices, Set<String> notFoundIndices) {
this.acceptedIndices = acceptedIndices;
this.rejectedIndices = rejectedIndices;
this.notFoundIndices = notFoundIndices;
}

public Set<Index> getAcceptedIndices() {
return acceptedIndices;
}

public void setAcceptedIndices(Set<Index> acceptedIndices) {
this.acceptedIndices = acceptedIndices;
}

public Map<String, String> getRejectedIndices() {
return rejectedIndices;
}

public Set<String> getNotFoundIndices() {
return notFoundIndices;
}

public void setNotFoundIndices(Set<String> notFoundIndices) {
this.notFoundIndices = notFoundIndices;
}

public void setRejectedIndices(Map<String, String> rejectedIndices) {
this.rejectedIndices = rejectedIndices;
}

public void addToNotFound(String indexName) {
notFoundIndices.add(indexName);
}

/**
* Method to move indices from success list to failed list with corresponding reasons to be sent in response
* @param indicesToFail - sets of indices that failed validation with same reason
* @param failureReason - reason for not accepting migration for this index
*/
public void addToRejected(Set<String> indicesToFail, String failureReason) {
for (String index : indicesToFail) {
addToRejected(index, failureReason);
}
}

/**
* Method to move index from success list to failed list with corresponding reasons to be sent in response
* @param indexName - indexName that failed validation
* @param failureReason - reason for not accepting migration for this index
*/
public void addToRejected(String indexName, String failureReason) {
rejectedIndices.put(indexName, failureReason);
}

public void addToAccepted(Index index) {
acceptedIndices.add(index);
}

public void addToAccepted(Set<Index> indices) {
acceptedIndices.addAll(indices);
}

public Set<String> getSuccessfulIndices() {
return successfulIndices;
}

public void setSuccessfulIndices(Set<String> successfulIndices) {
this.successfulIndices = successfulIndices;
}

public void addToSuccessful(String index) {
successfulIndices.add(index);
}

public Set<String> getFailedIndices() {
return failedIndices;
}

public void setFailedIndices(Set<String> failedIndices) {
this.failedIndices = failedIndices;
}

public void addToFailed(String index) {
failedIndices.add(index);
}

public ActionListener<HotToWarmTieringResponse> getListener() {
return actionListener;
}

public void setActionListener(ActionListener<HotToWarmTieringResponse> actionListener) {
this.actionListener = actionListener;
}

public TieringIndexRequest getRequest() {
return request;
}

public void setRequest(TieringIndexRequest request) {
this.request = request;
}

public String getRequestUuid() {
return requestUuid;
}

public void setRequestUuid(String requestUuid) {
this.requestUuid = requestUuid;
}

public Set<Index> getInProgressIndices() {
return inProgressIndices;
}

public void setInProgressIndices(Set<Index> inProgressIndices) {
this.inProgressIndices = inProgressIndices;
}

public void addToInProgress(Index index) {
inProgressIndices.add(index);
}

public void removeFromInProgress(Index index) {
inProgressIndices.remove(index);
}

public boolean isRequestProcessingComplete() {
return inProgressIndices.isEmpty();
}

public HotToWarmTieringResponse constructResponse() {
final List<HotToWarmTieringResponse.IndexResult> indicesResult = new LinkedList<>();
for (Map.Entry<String, String> rejectedIndex : rejectedIndices.entrySet()) {
indicesResult.add(new HotToWarmTieringResponse.IndexResult(rejectedIndex.getKey(), rejectedIndex.getValue()));
}
for (String index : notFoundIndices) {
indicesResult.add(new HotToWarmTieringResponse.IndexResult(index, "Index not found"));
}
for (String index : failedIndices) {
indicesResult.add(new HotToWarmTieringResponse.IndexResult(index, "Index failed"));
}
return new HotToWarmTieringResponse(acceptedIndices.size() > 0, indicesResult);
}

@Override
public String toString() {
return "HotToWarmTieringRequestContext{"
+ "acceptedIndices="
+ acceptedIndices
+ ", rejectedIndices="
+ rejectedIndices
+ ", notFoundIndices="
+ notFoundIndices
+ ", inProgressIndices="
+ inProgressIndices
+ ", successfulIndices="
+ successfulIndices
+ ", failedIndices="
+ failedIndices
+ ", actionListener="
+ actionListener
+ ", request="
+ request
+ ", requestUuid='"
+ requestUuid
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package org.opensearch.action.tiering;

import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;

/**
* Response object for an {@link TieringIndexRequest} which is sent to client after the initial verification of the request
* by the backend service. The format of the response object will be as below:
*
* {
* "acknowledged": true/false,
* "failed_indices": [
* {
* "index": "index1",
* "error": "Low disk threshold watermark breached"
* },
* {
* "index": "index2",
* "error": "Index is not a remote store backed index"
* }
* ]
* }
*/
public class HotToWarmTieringResponse extends AcknowledgedResponse {

private List<IndexResult> failedIndices;

public HotToWarmTieringResponse(boolean acknowledged, List<IndexResult> indicesResults) {
super(acknowledged);
this.failedIndices = (indicesResults == null) ? new LinkedList<>() : indicesResults;
}

public HotToWarmTieringResponse(StreamInput in) throws IOException {
super(in);
failedIndices = Collections.unmodifiableList(in.readList(IndexResult::new));
}

public HotToWarmTieringResponse failedIndices(List<IndexResult> failedIndices) {
this.failedIndices = failedIndices;
return this;
}

public List<HotToWarmTieringResponse.IndexResult> getFailedIndices() {
return this.failedIndices;
}

public void addIndexResult(String indexName, String failureReason) {
failedIndices.add(new IndexResult(indexName, failureReason));
}

public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(this.failedIndices);
}

protected void addCustomFields(XContentBuilder builder, Params params) throws IOException {
super.addCustomFields(builder, params);
builder.startArray("failed_indices");

for (IndexResult failedIndex : failedIndices) {
failedIndex.toXContent(builder, params);
}
builder.endArray();
}

@Override
public String toString() {
return Strings.toString(MediaTypeRegistry.JSON, this);
}

public static class IndexResult implements Writeable, ToXContentFragment {
private final String index;
private final String failureReason;

public IndexResult(String index, String failureReason) {
this.index = index;
this.failureReason = failureReason;
}

IndexResult(StreamInput in) throws IOException {
this.index = in.readString();
this.failureReason = in.readString();
}

public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeString(failureReason);
}

public String getIndex() {
return index;
}

public String getFailureReason() {
return failureReason;
}

public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("index", index);
builder.field("error", failureReason);
return builder.endObject();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IndexResult that = (IndexResult) o;
return Objects.equals(index, that.index) && Objects.equals(failureReason, that.failureReason);
}

@Override
public int hashCode() {
int result = Objects.hashCode(index);
result = 31 * result + Objects.hashCode(failureReason);
return result;
}

@Override
public String toString() {
return Strings.toString(MediaTypeRegistry.JSON, this);
}
}
}
Loading

0 comments on commit 8099f62

Please sign in to comment.