Skip to content

Commit

Permalink
CatIndexTool implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Nov 2, 2023
1 parent 5e321d3 commit 027d06e
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ml.common.output.model.ModelTensors;
import org.opensearch.ml.common.spi.tools.Parser;
import org.opensearch.ml.common.spi.tools.Tool;
import org.opensearch.ml.common.spi.tools.ToolAnnotation;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand All @@ -52,19 +48,20 @@ public class CatIndexTool implements Tool {
private Client client;
private String modelId;
@Setter
private Parser inputParser;
private Parser<?, ?> inputParser;
@Setter
private Parser outputParser;
private Parser<?, ?> outputParser;
private ClusterService clusterService;

public CatIndexTool(Client client, ClusterService clusterService, String modelId) {
this.client = client;
this.clusterService = clusterService;
this.modelId = modelId;

outputParser = new Parser() {
outputParser = new Parser<>() {
@Override
public Object parse(Object o) {
@SuppressWarnings("unchecked")
List<ModelTensors> mlModelOutputs = (List<ModelTensors>) o;
return mlModelOutputs.get(0).getMlModelTensors().get(0).getDataAsMap().get("response");
}
Expand All @@ -73,80 +70,98 @@ public Object parse(Object o) {

@Override
public <T> void run(Map<String, String> parameters, ActionListener<T> listener) {
List<String> indexList = gson.fromJson(parameters.get("indices"), List.class);
String[] indices = parameters.containsKey("indices")? indexList.toArray(new String[0]) : new String[]{};

String[] indices = null;
if (parameters.containsKey("indices")) {
@SuppressWarnings("unchecked")
List<String> indexList = gson.fromJson(parameters.get("indices"), List.class);
indices = indexList.toArray(new String[0]);
}
final IndicesOptions indicesOptions = IndicesOptions.lenientExpandHidden();
final boolean includeUnloadedSegments = parameters.containsKey("include_unloaded_segments")
? Boolean.parseBoolean(parameters.get("include_unloaded_segments"))
: false;

final IndicesStatsRequest request = new IndicesStatsRequest();
request.indices(indices);
request.indicesOptions(indicesOptions);
request.all();
boolean includeUnloadedSegments = parameters.containsKey("include_unloaded_segments")? Boolean.parseBoolean(parameters.get("include_unloaded_segments")) : false;
request.includeUnloadedSegments(includeUnloadedSegments);
final IndicesStatsRequest request = new IndicesStatsRequest().indices(indices)
.indicesOptions(indicesOptions)
.all()
.includeUnloadedSegments(includeUnloadedSegments);

client.admin().indices().stats(request, ActionListener.wrap(r -> {
try {
Set<String> indexSet = r.getIndices().keySet(); //TODO: handle empty case
XContentBuilder xContentBuilder = XContentBuilder.builder(XContentType.JSON.xContent());
r.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
String response = xContentBuilder.toString();
Set<String> indexSet = r.getIndices().keySet();
// Handle empty set
if (indexSet.isEmpty()) {
@SuppressWarnings("unchecked")
T empty = (T) ("There were no results searching the indices parameter [" + parameters.get("indices") + "].");
listener.onResponse(empty);
return;
}

// Iterate indices in response and map index to stats
Map<String, IndexState> indexStateMap = new HashMap<>();
Metadata metadata = clusterService.state().metadata();

for (String index : indexSet) {
IndexMetadata indexMetadata = metadata.index(index);
IndexStats indexStats = r.getIndices().get(index);
CommonStats totalStats = indexStats.getTotal();
CommonStats primaryStats = indexStats.getPrimaries();
IndexState.IndexStateBuilder indexStateBuilder = IndexState.builder();
indexStateBuilder.status(indexMetadata.getState().toString());
indexStateBuilder.index(indexStats.getIndex());
indexStateBuilder.uuid(indexMetadata.getIndexUUID());
indexStateBuilder.primaryShard(indexMetadata.getNumberOfShards());
indexStateBuilder.replicaShard(indexMetadata.getNumberOfReplicas());
indexStateBuilder.docCount(primaryStats.docs.getCount());
indexStateBuilder.docDeleted(primaryStats.docs.getDeleted());
indexStateBuilder.storeSize(totalStats.getStore().size().toString());
indexStateBuilder.primaryStoreSize(primaryStats.getStore().getSize().toString());
indexStateMap.put(index, indexStateBuilder.build());
}

Map<String, IndexState> indexStateMap = new HashMap<>();
Metadata metadata = clusterService.state().metadata();
// Get cluster health for each index
final ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest(indexSet.toArray(new String[0]))
.indicesOptions(indicesOptions)
.local(parameters.containsKey("local") ? Boolean.parseBoolean("local") : false)
.clusterManagerNodeTimeout(DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT);

for (String index : indexSet) {
IndexMetadata indexMetadata = metadata.index(index);
client.admin().cluster().health(clusterHealthRequest, ActionListener.wrap(res -> {
// Add health to index stats
Map<String, ClusterIndexHealth> indexHealthMap = res.getIndices();
for (String index : indexHealthMap.keySet()) {
IndexStats indexStats = r.getIndices().get(index);
CommonStats totalStats = indexStats.getTotal();
CommonStats primaryStats = indexStats.getPrimaries();
IndexState.IndexStateBuilder indexStateBuilder = IndexState.builder();
indexStateBuilder.status(indexMetadata.getState().toString());
indexStateBuilder.index(indexStats.getIndex());
indexStateBuilder.uuid(indexMetadata.getIndexUUID());
indexStateBuilder.primaryShard(indexMetadata.getNumberOfShards());
indexStateBuilder.replicaShard(indexMetadata.getNumberOfReplicas());
indexStateBuilder.docCount(primaryStats.docs.getCount());
indexStateBuilder.docDeleted(primaryStats.docs.getDeleted());
indexStateBuilder.storeSize(totalStats.getStore().size().toString());
indexStateBuilder.primaryStoreSize(primaryStats.getStore().getSize().toString());
indexStateMap.put(index, indexStateBuilder.build());
}

final ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest();
clusterHealthRequest.indices(indexSet.toArray(new String[0]));
clusterHealthRequest.indicesOptions(indicesOptions);
boolean local = parameters.containsKey("local")? Boolean.parseBoolean("local") : false;
clusterHealthRequest.local(local);
clusterHealthRequest.clusterManagerNodeTimeout(DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT);

client.admin().cluster().health(clusterHealthRequest, ActionListener.wrap(res-> {
Map<String, ClusterIndexHealth> indexHealthMap = res.getIndices();
for (String index : indexHealthMap.keySet()) {
IndexStats indexStats = r.getIndices().get(index);
final ClusterIndexHealth indexHealth = indexHealthMap.get(index);
final String health;
if (indexHealth != null) {
health = indexHealth.getStatus().toString().toLowerCase(Locale.ROOT);
} else if (indexStats != null) {
health = "red*";
} else {
health = "";
}
indexStateMap.get(index).setHealth(health);
}
StringBuilder responseBuilder = new StringBuilder("health\tstatus\tindex\tuuid\tpri\trep\tdocs.count\tdocs.deleted\tstore.size\tpri.store.size\n");
for (String index : indexStateMap.keySet()) {
responseBuilder.append(indexStateMap.get(index).toString()).append("\n");
final ClusterIndexHealth indexHealth = indexHealthMap.get(index);
final String health;
if (indexHealth != null) {
health = indexHealth.getStatus().toString().toLowerCase(Locale.ROOT);
} else if (indexStats != null) {
health = "red*";
} else {
health = "";
}
listener.onResponse((T)responseBuilder.toString());
}, ex->{listener.onFailure(ex);}));
} catch (IOException e) {
listener.onFailure(e);
}
}, e -> {
listener.onFailure(e);
}));
indexStateMap.get(index).setHealth(health);
}
// Prepare output with header row
StringBuilder responseBuilder = new StringBuilder(
"health\tstatus\tindex\tuuid\tpri\trep\tdocs.count\tdocs.deleted\tstore.size\tpri.store.size\n"
);
// Output a row for each index
for (IndexState state : indexStateMap.values()) {
responseBuilder.append(state.getHealth()).append('\t');
responseBuilder.append(state.getStatus()).append('\t');
responseBuilder.append(state.getIndex()).append('\t');
responseBuilder.append(state.getUuid()).append('\t');
responseBuilder.append(state.getPrimaryShard()).append('\t');
responseBuilder.append(state.getReplicaShard()).append('\t');
responseBuilder.append(state.getDocCount()).append('\t');
responseBuilder.append(state.getDocDeleted()).append('\t');
responseBuilder.append(state.getStoreSize()).append('\t');
responseBuilder.append(state.getPrimaryStoreSize()).append('\n');
}
@SuppressWarnings("unchecked")
T s = (T) responseBuilder.toString();
listener.onResponse(s);
}, ex -> { listener.onFailure(ex); }));
}, e -> { listener.onFailure(e); }));
}

@Data
Expand Down Expand Up @@ -175,21 +190,6 @@ public IndexState(String health, String status, String index, String uuid, Integ
this.storeSize = storeSize;
this.primaryStoreSize = primaryStoreSize;
}

@Override
public String toString() {
return
health + '\t' +
status + '\t' +
index + '\t' +
uuid + '\t' +
primaryShard + '\t' +
replicaShard + '\t' +
docCount + '\t' +
docDeleted + '\t' +
storeSize + '\t' +
primaryStoreSize;
}
}


Expand Down Expand Up @@ -231,7 +231,7 @@ public void init(Client client, ClusterService clusterService) {

@Override
public CatIndexTool create(Map<String, Object> map) {
return new CatIndexTool(client, clusterService, (String)map.get("model_id"));
return new CatIndexTool(client, clusterService, (String) map.get("model_id"));
}

@Override
Expand Down
Loading

0 comments on commit 027d06e

Please sign in to comment.