Skip to content

Commit

Permalink
Add rest, transport layer changes for Hot to warm tiering - dedicated…
Browse files Browse the repository at this point in the history
… setup (opensearch-project#13980)

Signed-off-by: Neetika Singhal <[email protected]>
  • Loading branch information
neetikasinghal authored Jul 23, 2024
1 parent 90d5500 commit c82a282
Show file tree
Hide file tree
Showing 17 changed files with 1,630 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))
- Add persian_stem filter (([#14847](https://github.com/opensearch-project/OpenSearch/pull/14847)))
- Add rest, transport layer changes for hot to warm tiering - dedicated setup (([#13980](https://github.com/opensearch-project/OpenSearch/pull/13980))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@
import org.opensearch.action.admin.indices.template.put.TransportPutComponentTemplateAction;
import org.opensearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.opensearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringAction;
import org.opensearch.action.admin.indices.tiering.RestWarmTieringAction;
import org.opensearch.action.admin.indices.tiering.TransportHotToWarmTieringAction;
import org.opensearch.action.admin.indices.upgrade.get.TransportUpgradeStatusAction;
import org.opensearch.action.admin.indices.upgrade.get.UpgradeStatusAction;
import org.opensearch.action.admin.indices.upgrade.post.TransportUpgradeAction;
Expand Down Expand Up @@ -634,6 +637,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class);
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) {
actions.register(HotToWarmTieringAction.INSTANCE, TransportHotToWarmTieringAction.class);
}
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
Expand Down Expand Up @@ -966,6 +972,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestNodeAttrsAction());
registerHandler.accept(new RestRepositoriesAction());
registerHandler.accept(new RestSnapshotAction());
if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) {
registerHandler.accept(new RestWarmTieringAction());
}
registerHandler.accept(new RestTemplatesAction());

// Point in time API
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.tiering;

import org.opensearch.action.ActionType;
import org.opensearch.common.annotation.ExperimentalApi;

/**
* Tiering action to move indices from hot to warm
*
* @opensearch.experimental
*/
@ExperimentalApi
public class HotToWarmTieringAction extends ActionType<HotToWarmTieringResponse> {

public static final HotToWarmTieringAction INSTANCE = new HotToWarmTieringAction();
public static final String NAME = "indices:admin/tier/hot_to_warm";

private HotToWarmTieringAction() {
super(NAME, HotToWarmTieringResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.tiering;

import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Response object for an {@link TieringIndexRequest} which is sent to client after the initial verification of the request
* by the backend service. The format of the response object will be as below:
*
* {
* "acknowledged": true/false,
* "failed_indices": [
* {
* "index": "index1",
* "error": "Low disk threshold watermark breached"
* },
* {
* "index": "index2",
* "error": "Index is not a remote store backed index"
* }
* ]
* }
*
* @opensearch.experimental
*/
@ExperimentalApi
public class HotToWarmTieringResponse extends AcknowledgedResponse {

private final List<IndexResult> failedIndices;

public HotToWarmTieringResponse(boolean acknowledged) {
super(acknowledged);
this.failedIndices = Collections.emptyList();
}

public HotToWarmTieringResponse(boolean acknowledged, List<IndexResult> indicesResults) {
super(acknowledged);
this.failedIndices = (indicesResults == null)
? Collections.emptyList()
: indicesResults.stream().sorted(Comparator.comparing(IndexResult::getIndex)).collect(Collectors.toList());
}

public HotToWarmTieringResponse(StreamInput in) throws IOException {
super(in);
failedIndices = Collections.unmodifiableList(in.readList(IndexResult::new));
}

public List<IndexResult> getFailedIndices() {
return this.failedIndices;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(this.failedIndices);
}

@Override
protected void addCustomFields(XContentBuilder builder, Params params) throws IOException {
super.addCustomFields(builder, params);
builder.startArray("failed_indices");

for (IndexResult failedIndex : failedIndices) {
failedIndex.toXContent(builder, params);
}
builder.endArray();
}

@Override
public String toString() {
return Strings.toString(MediaTypeRegistry.JSON, this);
}

/**
* Inner class to represent the result of a failed index for tiering.
* @opensearch.experimental
*/
@ExperimentalApi
public static class IndexResult implements Writeable, ToXContentFragment {
private final String index;
private final String failureReason;

public IndexResult(String index, String failureReason) {
this.index = index;
this.failureReason = failureReason;
}

IndexResult(StreamInput in) throws IOException {
this.index = in.readString();
this.failureReason = in.readString();
}

public String getIndex() {
return index;
}

public String getFailureReason() {
return failureReason;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeString(failureReason);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("index", index);
builder.field("error", failureReason);
return builder.endObject();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IndexResult that = (IndexResult) o;
return Objects.equals(index, that.index) && Objects.equals(failureReason, that.failureReason);
}

@Override
public int hashCode() {
int result = Objects.hashCode(index);
result = 31 * result + Objects.hashCode(failureReason);
return result;
}

@Override
public String toString() {
return Strings.toString(MediaTypeRegistry.JSON, this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.tiering;

import org.opensearch.action.support.IndicesOptions;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.util.List;

import static java.util.Collections.singletonList;
import static org.opensearch.core.common.Strings.splitStringByCommaToArray;
import static org.opensearch.rest.RestRequest.Method.POST;

/**
* Rest Tiering API action to move indices to warm tier
*
* @opensearch.experimental
*/
@ExperimentalApi
public class RestWarmTieringAction extends BaseRestHandler {

private static final String TARGET_TIER = "warm";

@Override
public List<RestHandler.Route> routes() {
return singletonList(new RestHandler.Route(POST, "/{index}/_tier/" + TARGET_TIER));
}

@Override
public String getName() {
return "warm_tiering_action";
}

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
final TieringIndexRequest tieringIndexRequest = new TieringIndexRequest(
TARGET_TIER,
splitStringByCommaToArray(request.param("index"))
);
tieringIndexRequest.timeout(request.paramAsTime("timeout", tieringIndexRequest.timeout()));
tieringIndexRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", tieringIndexRequest.clusterManagerNodeTimeout())
);
tieringIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, tieringIndexRequest.indicesOptions()));
tieringIndexRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", tieringIndexRequest.waitForCompletion()));
return channel -> client.admin()
.cluster()
.execute(HotToWarmTieringAction.INSTANCE, tieringIndexRequest, new RestToXContentListener<>(channel));
}
}
Loading

0 comments on commit c82a282

Please sign in to comment.