From 7863859c28cab9749a3339f114bea65839a3ed4b Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Sun, 17 Dec 2023 19:47:07 -0800 Subject: [PATCH] Add CatIndexTool (#1746) (#1770) * Add CatIndexTool Signed-off-by: Daniel Widdis * Add test coverage Signed-off-by: Daniel Widdis --------- Signed-off-by: Daniel Widdis (cherry picked from commit 48ad895e0f6beea5f2843da1d403c333f4b5ffda) Co-authored-by: Daniel Widdis --- .gitignore | 3 + .../ml/engine/tools/CatIndexTool.java | 435 ++++++++++++++++++ .../ml/engine/tools/CatIndexToolTests.java | 245 ++++++++++ 3 files changed, 683 insertions(+) create mode 100644 ml-algorithms/src/main/java/org/opensearch/ml/engine/tools/CatIndexTool.java create mode 100644 ml-algorithms/src/test/java/org/opensearch/ml/engine/tools/CatIndexToolTests.java diff --git a/.gitignore b/.gitignore index 837a460577..227e5d094d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ .gradle/ build/ .idea/ +.project +.classpath +.settings client/build/ common/build/ ml-algorithms/build/ diff --git a/ml-algorithms/src/main/java/org/opensearch/ml/engine/tools/CatIndexTool.java b/ml-algorithms/src/main/java/org/opensearch/ml/engine/tools/CatIndexTool.java new file mode 100644 index 0000000000..16cec3870d --- /dev/null +++ b/ml-algorithms/src/main/java/org/opensearch/ml/engine/tools/CatIndexTool.java @@ -0,0 +1,435 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.engine.tools; + +import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT; +import static org.opensearch.ml.common.utils.StringUtils.gson; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Spliterators; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.logging.log4j.util.Strings; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.IndexStats; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.client.Client; +import org.opensearch.cluster.health.ClusterIndexHealth; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Table; +import org.opensearch.common.Table.Cell; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.index.IndexSettings; +import org.opensearch.ml.common.output.model.ModelTensors; +import org.opensearch.ml.common.spi.tools.Parser; +import org.opensearch.ml.common.spi.tools.Tool; +import org.opensearch.ml.common.spi.tools.ToolAnnotation; + +import lombok.Getter; +import lombok.Setter; + +@ToolAnnotation(CatIndexTool.TYPE) +public class CatIndexTool implements Tool { + public static final String TYPE = "CatIndexTool"; + private static final String DEFAULT_DESCRIPTION = "Use this tool to get index information."; + + @Setter + @Getter + private String name = CatIndexTool.TYPE; + @Getter + @Setter + private String description = DEFAULT_DESCRIPTION; + @Getter + private String version; + + private Client client; + @Setter + private Parser inputParser; + @Setter + private Parser outputParser; + @SuppressWarnings("unused") + private ClusterService clusterService; + + public CatIndexTool(Client client, ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + + outputParser = new Parser<>() { + @Override + public Object parse(Object o) { + @SuppressWarnings("unchecked") + List mlModelOutputs = (List) o; + return mlModelOutputs.get(0).getMlModelTensors().get(0).getDataAsMap().get("response"); + } + }; + } + + @Override + public void run(Map parameters, ActionListener listener) { + // TODO: This logic exactly matches the OpenSearch _cat/indices REST action. If code at + // o.o.rest/action/cat/RestIndicesAction.java changes those changes need to be reflected here + // https://github.com/opensearch-project/ml-commons/pull/1582#issuecomment-1796962876 + @SuppressWarnings("unchecked") + List indexList = parameters.containsKey("indices") + ? gson.fromJson(parameters.get("indices"), List.class) + : Collections.emptyList(); + final String[] indices = indexList.toArray(Strings.EMPTY_ARRAY); + + final IndicesOptions indicesOptions = IndicesOptions.strictExpand(); + final boolean local = parameters.containsKey("local") ? Boolean.parseBoolean("local") : false; + final TimeValue clusterManagerNodeTimeout = DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT; + final boolean includeUnloadedSegments = parameters.containsKey("include_unloaded_segments") + ? Boolean.parseBoolean(parameters.get("include_unloaded_segments")) + : false; + + final ActionListener internalListener = ActionListener.notifyOnce(ActionListener.wrap(table -> { + // Handle empty table + if (table.getRows().isEmpty()) { + @SuppressWarnings("unchecked") + T empty = (T) ("There were no results searching the indices parameter [" + parameters.get("indices") + "]."); + listener.onResponse(empty); + return; + } + StringBuilder sb = new StringBuilder( + // Currently using c.value which is short header matching _cat/indices + // May prefer to use c.attr.get("desc") for full description + table.getHeaders().stream().map(c -> c.value.toString()).collect(Collectors.joining("\t", "", "\n")) + ); + for (List row : table.getRows()) { + sb.append(row.stream().map(c -> c.value == null ? null : c.value.toString()).collect(Collectors.joining("\t", "", "\n"))); + } + @SuppressWarnings("unchecked") + T response = (T) sb.toString(); + listener.onResponse(response); + }, listener::onFailure)); + + sendGetSettingsRequest( + indices, + indicesOptions, + local, + clusterManagerNodeTimeout, + client, + new ActionListener() { + @Override + public void onResponse(final GetSettingsResponse getSettingsResponse) { + final GroupedActionListener groupedListener = createGroupedListener(4, internalListener); + groupedListener.onResponse(getSettingsResponse); + + // The list of indices that will be returned is determined by the indices returned from the Get Settings call. + // All the other requests just provide additional detail, and wildcards may be resolved differently depending on the + // type of request in the presence of security plugins (looking at you, ClusterHealthRequest), so + // force the IndicesOptions for all the sub-requests to be as inclusive as possible. + final IndicesOptions subRequestIndicesOptions = IndicesOptions.lenientExpandHidden(); + + sendIndicesStatsRequest( + indices, + subRequestIndicesOptions, + includeUnloadedSegments, + client, + ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + ); + sendClusterStateRequest( + indices, + subRequestIndicesOptions, + local, + clusterManagerNodeTimeout, + client, + ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + ); + sendClusterHealthRequest( + indices, + subRequestIndicesOptions, + local, + clusterManagerNodeTimeout, + client, + ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure) + ); + } + + @Override + public void onFailure(final Exception e) { + internalListener.onFailure(e); + } + } + ); + } + + @Override + public String getType() { + return TYPE; + } + + /** + * We're using the Get Settings API here to resolve the authorized indices for the user. + * This is because the Cluster State and Cluster Health APIs do not filter output based + * on index privileges, so they can't be used to determine which indices are authorized + * or not. On top of this, the Indices Stats API cannot be used either to resolve indices + * as it does not provide information for all existing indices (for example recovering + * indices or non replicated closed indices are not reported in indices stats response). + */ + private void sendGetSettingsRequest( + final String[] indices, + final IndicesOptions indicesOptions, + final boolean local, + final TimeValue clusterManagerNodeTimeout, + final Client client, + final ActionListener listener + ) { + final GetSettingsRequest request = new GetSettingsRequest(); + request.indices(indices); + request.indicesOptions(indicesOptions); + request.local(local); + request.clusterManagerNodeTimeout(clusterManagerNodeTimeout); + request.names(IndexSettings.INDEX_SEARCH_THROTTLED.getKey()); + + client.admin().indices().getSettings(request, listener); + } + + private void sendClusterStateRequest( + final String[] indices, + final IndicesOptions indicesOptions, + final boolean local, + final TimeValue clusterManagerNodeTimeout, + final Client client, + final ActionListener listener + ) { + + final ClusterStateRequest request = new ClusterStateRequest(); + request.indices(indices); + request.indicesOptions(indicesOptions); + request.local(local); + request.clusterManagerNodeTimeout(clusterManagerNodeTimeout); + + client.admin().cluster().state(request, listener); + } + + private void sendClusterHealthRequest( + final String[] indices, + final IndicesOptions indicesOptions, + final boolean local, + final TimeValue clusterManagerNodeTimeout, + final Client client, + final ActionListener listener + ) { + + final ClusterHealthRequest request = new ClusterHealthRequest(); + request.indices(indices); + request.indicesOptions(indicesOptions); + request.local(local); + request.clusterManagerNodeTimeout(clusterManagerNodeTimeout); + + client.admin().cluster().health(request, listener); + } + + private void sendIndicesStatsRequest( + final String[] indices, + final IndicesOptions indicesOptions, + final boolean includeUnloadedSegments, + final Client client, + final ActionListener listener + ) { + + final IndicesStatsRequest request = new IndicesStatsRequest(); + request.indices(indices); + request.indicesOptions(indicesOptions); + request.all(); + request.includeUnloadedSegments(includeUnloadedSegments); + + client.admin().indices().stats(request, listener); + } + + private GroupedActionListener createGroupedListener(final int size, final ActionListener
listener) { + return new GroupedActionListener<>(new ActionListener>() { + @Override + public void onResponse(final Collection responses) { + try { + GetSettingsResponse settingsResponse = extractResponse(responses, GetSettingsResponse.class); + Map indicesSettings = StreamSupport + .stream(Spliterators.spliterator(settingsResponse.getIndexToSettings().entrySet(), 0), false) + .collect(Collectors.toMap(cursor -> cursor.getKey(), cursor -> cursor.getValue())); + + ClusterStateResponse stateResponse = extractResponse(responses, ClusterStateResponse.class); + Map indicesStates = StreamSupport + .stream(stateResponse.getState().getMetadata().spliterator(), false) + .collect(Collectors.toMap(indexMetadata -> indexMetadata.getIndex().getName(), Function.identity())); + + ClusterHealthResponse healthResponse = extractResponse(responses, ClusterHealthResponse.class); + Map indicesHealths = healthResponse.getIndices(); + + IndicesStatsResponse statsResponse = extractResponse(responses, IndicesStatsResponse.class); + Map indicesStats = statsResponse.getIndices(); + + Table responseTable = buildTable(indicesSettings, indicesHealths, indicesStats, indicesStates); + listener.onResponse(responseTable); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + listener.onFailure(e); + } + }, size); + } + + @Override + public boolean validate(Map parameters) { + if (parameters == null || parameters.size() == 0) { + return false; + } + return true; + } + + /** + * Factory for the {@link CatIndexTool} + */ + public static class Factory implements Tool.Factory { + private Client client; + private ClusterService clusterService; + + private static Factory INSTANCE; + + /** + * Create or return the singleton factory instance + */ + public static Factory getInstance() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (CatIndexTool.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new Factory(); + return INSTANCE; + } + } + + /** + * Initialize this factory + * @param client The OpenSearch client + * @param clusterService The OpenSearch cluster service + */ + public void init(Client client, ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + } + + @Override + public CatIndexTool create(Map map) { + return new CatIndexTool(client, clusterService); + } + + @Override + public String getDefaultDescription() { + return DEFAULT_DESCRIPTION; + } + } + + private Table getTableWithHeader() { + Table table = new Table(); + table.startHeaders(); + // First param is cell.value which is currently returned + // Second param is cell.attr we may want to use attr.desc in the future + table.addCell("health", "alias:h;desc:current health status"); + table.addCell("status", "alias:s;desc:open/close status"); + table.addCell("index", "alias:i,idx;desc:index name"); + table.addCell("uuid", "alias:id,uuid;desc:index uuid"); + table.addCell("pri", "alias:p,shards.primary,shardsPrimary;text-align:right;desc:number of primary shards"); + table.addCell("rep", "alias:r,shards.replica,shardsReplica;text-align:right;desc:number of replica shards"); + table.addCell("docs.count", "alias:dc,docsCount;text-align:right;desc:available docs"); + table.addCell("docs.deleted", "alias:dd,docsDeleted;text-align:right;desc:deleted docs"); + table.addCell("store.size", "sibling:pri;alias:ss,storeSize;text-align:right;desc:store size of primaries & replicas"); + table.addCell("pri.store.size", "text-align:right;desc:store size of primaries"); + // Above includes all the default fields for cat indices. See RestIndicesAction for a lot more that could be included. + table.endHeaders(); + return table; + } + + private Table buildTable( + final Map indicesSettings, + final Map indicesHealths, + final Map indicesStats, + final Map indicesMetadatas + ) { + final Table table = getTableWithHeader(); + + indicesSettings.forEach((indexName, settings) -> { + if (indicesMetadatas.containsKey(indexName) == false) { + // the index exists in the Get Indices response but is not present in the cluster state: + // it is likely that the index was deleted in the meanwhile, so we ignore it. + return; + } + + final IndexMetadata indexMetadata = indicesMetadatas.get(indexName); + final IndexMetadata.State indexState = indexMetadata.getState(); + final IndexStats indexStats = indicesStats.get(indexName); + + final String health; + final ClusterIndexHealth indexHealth = indicesHealths.get(indexName); + if (indexHealth != null) { + health = indexHealth.getStatus().toString().toLowerCase(Locale.ROOT); + } else if (indexStats != null) { + health = "red*"; + } else { + health = ""; + } + + final CommonStats primaryStats; + final CommonStats totalStats; + + if (indexStats == null || indexState == IndexMetadata.State.CLOSE) { + primaryStats = new CommonStats(); + totalStats = new CommonStats(); + } else { + primaryStats = indexStats.getPrimaries(); + totalStats = indexStats.getTotal(); + } + table.startRow(); + table.addCell(health); + table.addCell(indexState.toString().toLowerCase(Locale.ROOT)); + table.addCell(indexName); + table.addCell(indexMetadata.getIndexUUID()); + table.addCell(indexHealth == null ? null : indexHealth.getNumberOfShards()); + table.addCell(indexHealth == null ? null : indexHealth.getNumberOfReplicas()); + + table.addCell(primaryStats.getDocs() == null ? null : primaryStats.getDocs().getCount()); + table.addCell(primaryStats.getDocs() == null ? null : primaryStats.getDocs().getDeleted()); + + table.addCell(totalStats.getStore() == null ? null : totalStats.getStore().size()); + table.addCell(primaryStats.getStore() == null ? null : primaryStats.getStore().size()); + + table.endRow(); + }); + + return table; + } + + @SuppressWarnings("unchecked") + private static A extractResponse(final Collection responses, Class c) { + return (A) responses.stream().filter(c::isInstance).findFirst().get(); + } +} diff --git a/ml-algorithms/src/test/java/org/opensearch/ml/engine/tools/CatIndexToolTests.java b/ml-algorithms/src/test/java/org/opensearch/ml/engine/tools/CatIndexToolTests.java new file mode 100644 index 0000000000..cffb0ff338 --- /dev/null +++ b/ml-algorithms/src/test/java/org/opensearch/ml/engine/tools/CatIndexToolTests.java @@ -0,0 +1,245 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ml.engine.tools; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.admin.indices.stats.IndexStats; +import org.opensearch.action.admin.indices.stats.IndexStats.IndexStatsBuilder; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.ClusterAdminClient; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.health.ClusterIndexHealth; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexMetadata.State; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.ml.common.spi.tools.Tool; +import org.opensearch.ml.engine.tools.CatIndexTool.Factory; + +public class CatIndexToolTests { + + @Mock + private Client client; + @Mock + private AdminClient adminClient; + @Mock + private IndicesAdminClient indicesAdminClient; + @Mock + private ClusterAdminClient clusterAdminClient; + @Mock + private ClusterService clusterService; + @Mock + private ClusterState clusterState; + @Mock + private Metadata metadata; + @Mock + private GetSettingsResponse getSettingsResponse; + @Mock + private IndicesStatsResponse indicesStatsResponse; + @Mock + private ClusterStateResponse clusterStateResponse; + @Mock + private ClusterHealthResponse clusterHealthResponse; + @Mock + private IndexMetadata indexMetadata; + @Mock + private IndexRoutingTable indexRoutingTable; + + private Map indicesParams; + private Map otherParams; + private Map emptyParams; + + @Before + public void setup() { + MockitoAnnotations.openMocks(this); + + when(adminClient.indices()).thenReturn(indicesAdminClient); + when(adminClient.cluster()).thenReturn(clusterAdminClient); + when(client.admin()).thenReturn(adminClient); + + when(indexMetadata.getState()).thenReturn(State.OPEN); + when(indexMetadata.getCreationVersion()).thenReturn(Version.CURRENT); + + when(metadata.index(any(String.class))).thenReturn(indexMetadata); + when(clusterState.metadata()).thenReturn(metadata); + when(clusterService.state()).thenReturn(clusterState); + + CatIndexTool.Factory.getInstance().init(client, clusterService); + + indicesParams = Map.of("index", "[\"foo\"]"); + otherParams = Map.of("other", "[\"bar\"]"); + emptyParams = Collections.emptyMap(); + } + + @Test + public void testRunAsyncNoIndices() throws Exception { + @SuppressWarnings("unchecked") + ArgumentCaptor> settingsActionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + doNothing().when(indicesAdminClient).getSettings(any(), settingsActionListenerCaptor.capture()); + + @SuppressWarnings("unchecked") + ArgumentCaptor> statsActionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + doNothing().when(indicesAdminClient).stats(any(), statsActionListenerCaptor.capture()); + + @SuppressWarnings("unchecked") + ArgumentCaptor> clusterStateActionListenerCaptor = ArgumentCaptor + .forClass(ActionListener.class); + doNothing().when(clusterAdminClient).state(any(), clusterStateActionListenerCaptor.capture()); + + @SuppressWarnings("unchecked") + ArgumentCaptor> clusterHealthActionListenerCaptor = ArgumentCaptor + .forClass(ActionListener.class); + doNothing().when(clusterAdminClient).health(any(), clusterHealthActionListenerCaptor.capture()); + + when(getSettingsResponse.getIndexToSettings()).thenReturn(Collections.emptyMap()); + when(indicesStatsResponse.getIndices()).thenReturn(Collections.emptyMap()); + when(clusterStateResponse.getState()).thenReturn(clusterState); + when(clusterState.getMetadata()).thenReturn(metadata); + when(metadata.spliterator()).thenReturn(Arrays.spliterator(new IndexMetadata[0])); + + when(clusterHealthResponse.getIndices()).thenReturn(Collections.emptyMap()); + + Tool tool = CatIndexTool.Factory.getInstance().create(Collections.emptyMap()); + final CompletableFuture future = new CompletableFuture<>(); + ActionListener listener = ActionListener.wrap(r -> { future.complete(r); }, e -> { future.completeExceptionally(e); }); + + tool.run(otherParams, listener); + settingsActionListenerCaptor.getValue().onResponse(getSettingsResponse); + statsActionListenerCaptor.getValue().onResponse(indicesStatsResponse); + clusterStateActionListenerCaptor.getValue().onResponse(clusterStateResponse); + clusterHealthActionListenerCaptor.getValue().onResponse(clusterHealthResponse); + + future.join(); + assertEquals("There were no results searching the indices parameter [null].", future.get()); + } + + @Test + public void testRunAsyncIndexStats() throws Exception { + String indexName = "foo"; + Index index = new Index(indexName, UUIDs.base64UUID()); + + @SuppressWarnings("unchecked") + ArgumentCaptor> settingsActionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + doNothing().when(indicesAdminClient).getSettings(any(), settingsActionListenerCaptor.capture()); + + @SuppressWarnings("unchecked") + ArgumentCaptor> statsActionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + doNothing().when(indicesAdminClient).stats(any(), statsActionListenerCaptor.capture()); + + @SuppressWarnings("unchecked") + ArgumentCaptor> clusterStateActionListenerCaptor = ArgumentCaptor + .forClass(ActionListener.class); + doNothing().when(clusterAdminClient).state(any(), clusterStateActionListenerCaptor.capture()); + + @SuppressWarnings("unchecked") + ArgumentCaptor> clusterHealthActionListenerCaptor = ArgumentCaptor + .forClass(ActionListener.class); + doNothing().when(clusterAdminClient).health(any(), clusterHealthActionListenerCaptor.capture()); + + when(getSettingsResponse.getIndexToSettings()).thenReturn(Map.of("foo", Settings.EMPTY)); + + int shardId = 0; + ShardId shId = new ShardId(index, shardId); + Path path = Files.createTempDirectory("temp").resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardId)); + ShardPath shardPath = new ShardPath(false, path, path, shId); + ShardRouting routing = TestShardRouting.newShardRouting(shId, "node", true, ShardRoutingState.STARTED); + CommonStats commonStats = new CommonStats(CommonStatsFlags.ALL); + IndexStats fooStats = new IndexStatsBuilder(index.getName(), index.getUUID()) + .add(new ShardStats(routing, shardPath, commonStats, null, null, null)) + .build(); + when(indicesStatsResponse.getIndices()).thenReturn(Map.of(indexName, fooStats)); + + when(indexMetadata.getIndex()).thenReturn(index); + when(indexMetadata.getNumberOfShards()).thenReturn(5); + when(indexMetadata.getNumberOfReplicas()).thenReturn(1); + when(clusterStateResponse.getState()).thenReturn(clusterState); + when(clusterState.getMetadata()).thenReturn(metadata); + when(metadata.spliterator()).thenReturn(Arrays.spliterator(new IndexMetadata[] { indexMetadata })); + @SuppressWarnings("unchecked") + Iterator iterator = (Iterator) mock(Iterator.class); + when(iterator.hasNext()).thenReturn(false); + when(indexRoutingTable.iterator()).thenReturn(iterator); + ClusterIndexHealth fooHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable); + when(clusterHealthResponse.getIndices()).thenReturn(Map.of(indexName, fooHealth)); + + // Now make the call + Tool tool = CatIndexTool.Factory.getInstance().create(Collections.emptyMap()); + final CompletableFuture future = new CompletableFuture<>(); + ActionListener listener = ActionListener.wrap(r -> { future.complete(r); }, e -> { future.completeExceptionally(e); }); + + tool.run(otherParams, listener); + settingsActionListenerCaptor.getValue().onResponse(getSettingsResponse); + statsActionListenerCaptor.getValue().onResponse(indicesStatsResponse); + clusterStateActionListenerCaptor.getValue().onResponse(clusterStateResponse); + clusterHealthActionListenerCaptor.getValue().onResponse(clusterHealthResponse); + + future.orTimeout(10, TimeUnit.SECONDS).join(); + String response = future.get(); + String[] responseRows = response.trim().split("\\n"); + + assertEquals(2, responseRows.length); + String header = responseRows[0]; + String fooRow = responseRows[1]; + assertEquals(header.split("\\t").length, fooRow.split("\\t").length); + assertEquals("health\tstatus\tindex\tuuid\tpri\trep\tdocs.count\tdocs.deleted\tstore.size\tpri.store.size", header); + assertEquals("red\topen\tfoo\tnull\t5\t1\t0\t0\t0b\t0b", fooRow); + } + + @Test + public void testTool() { + Factory instance = CatIndexTool.Factory.getInstance(); + assertEquals(instance, CatIndexTool.Factory.getInstance()); + assertTrue(instance.getDefaultDescription().contains("tool")); + + Tool tool = instance.create(Collections.emptyMap()); + assertEquals(CatIndexTool.TYPE, tool.getType()); + assertTrue(tool.validate(indicesParams)); + assertTrue(tool.validate(otherParams)); + assertFalse(tool.validate(emptyParams)); + } +}