Skip to content

Commit

Permalink
Implement Index Mapping Tool (opensearch-project#1609)
Browse files Browse the repository at this point in the history
* IndexMappingTool implementation

Signed-off-by: Daniel Widdis <[email protected]>

* Immediately fail if no index parameter

Signed-off-by: Daniel Widdis <[email protected]>

* Fix test input

Signed-off-by: Daniel Widdis <[email protected]>

* Remove unused modelId

Signed-off-by: Daniel Widdis <[email protected]>

* Remove unused clusterService

Signed-off-by: Daniel Widdis <[email protected]>

* Add test coverage of "no results" case

Signed-off-by: Daniel Widdis <[email protected]>

* Rename map variable to match its content

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Nov 22, 2023
1 parent fd26f46 commit abac194
Show file tree
Hide file tree
Showing 2 changed files with 372 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.engine.tools;

import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.ml.common.output.model.ModelTensors;
import org.opensearch.ml.common.spi.tools.Parser;
import org.opensearch.ml.common.spi.tools.Tool;
import org.opensearch.ml.common.spi.tools.ToolAnnotation;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
import static org.opensearch.ml.common.utils.StringUtils.gson;

@ToolAnnotation(IndexMappingTool.NAME)
public class IndexMappingTool implements Tool {
public static final String NAME = "IndexMappingTool";

private static final String DEFAULT_DESCRIPTION = "Use this tool to get index mapping information.";
@Setter
@Getter
private String name = IndexMappingTool.NAME;
@Getter
@Setter
private String description = DEFAULT_DESCRIPTION;
@Getter
private String type;
@Getter
private String version;
private Client client;
@Setter
private Parser<?, ?> inputParser;
@Setter
private Parser<?, ?> outputParser;

public IndexMappingTool(Client client) {
this.client = client;

outputParser = new Parser<>() {
@Override
public Object parse(Object o) {
@SuppressWarnings("unchecked")
List<ModelTensors> mlModelOutputs = (List<ModelTensors>) o;
return mlModelOutputs.get(0).getMlModelTensors().get(0).getDataAsMap().get("response");
}
};
}

@Override
public <T> void run(Map<String, String> parameters, ActionListener<T> listener) {
@SuppressWarnings("unchecked")
List<String> indexList = parameters.containsKey("index")
? gson.fromJson(parameters.get("index"), List.class)
: Collections.emptyList();
if (indexList.isEmpty()) {
@SuppressWarnings("unchecked")
T empty = (T) ("There were no results searching the index parameter [" + parameters.get("index") + "].");
listener.onResponse(empty);
return;
}

final String[] indices = indexList.toArray(Strings.EMPTY_ARRAY);

final IndicesOptions indicesOptions = IndicesOptions.strictExpand();
final boolean local = parameters.containsKey("local") ? Boolean.parseBoolean("local") : false;
final TimeValue clusterManagerNodeTimeout = DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;

ActionListener<GetIndexResponse> internalListener = new ActionListener<GetIndexResponse>() {

@Override
public void onResponse(GetIndexResponse getIndexResponse) {
try {
// Handle empty response
if (getIndexResponse.indices().length == 0) {
@SuppressWarnings("unchecked")
T empty = (T) ("There were no results searching the index parameter [" + parameters.get("index") + "].");
listener.onResponse(empty);
return;
}
StringBuilder sb = new StringBuilder();
for (String index : getIndexResponse.indices()) {
sb.append("index: ").append(index).append("\n\n");

MappingMetadata mapping = getIndexResponse.mappings().get(index);
if (mapping != null) {
sb.append("mappings:\n");
for (Entry<String, Object> entry: mapping.sourceAsMap().entrySet()) {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append('\n');
}
sb.append("\n\n");
}

Settings settings = getIndexResponse.settings().get(index);
if (settings != null) {
sb.append("settings:\n").append(settings.toDelimitedString('\n')).append("\n\n");
}
}

@SuppressWarnings("unchecked")
T response = (T) sb.toString();
listener.onResponse(response);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}

};
final GetIndexRequest getIndexRequest = new GetIndexRequest().indices(indices)
.indicesOptions(indicesOptions)
.local(local)
.clusterManagerNodeTimeout(clusterManagerNodeTimeout);

client.admin().indices().getIndex(getIndexRequest, internalListener);
}

@Override
public boolean validate(Map<String, String> parameters) {
if (parameters == null || parameters.size() == 0) {
return false;
}
return true;
}

/**
* Factory for the {@link IndexMappingTool}
*/
public static class Factory implements Tool.Factory<IndexMappingTool> {
private Client client;

private static Factory INSTANCE;

/**
* Create or return the singleton factory instance
*/
public static Factory getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (IndexMappingTool.class) {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Factory();
return INSTANCE;
}
}

/**
* Initialize this factory
* @param client The OpenSearch client
*/
public void init(Client client) {
this.client = client;
}

@Override
public IndexMappingTool create(Map<String, Object> map) {
return new IndexMappingTool(client);
}

@Override
public String getDefaultDescription() {
return DEFAULT_DESCRIPTION;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.engine.tools;

import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.ml.common.spi.tools.Tool;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.json.JsonXContent;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

public class IndexMappingToolTests {

@Mock
private Client client;
@Mock
private AdminClient adminClient;
@Mock
private IndicesAdminClient indicesAdminClient;
@Mock
private MappingMetadata mappingMetadata;
@Mock
private GetIndexResponse getIndexResponse;

private Map<String, String> indexParams;
private Map<String, String> otherParams;
private Map<String, String> emptyParams;

@Before
public void setup() {
MockitoAnnotations.openMocks(this);

when(adminClient.indices()).thenReturn(indicesAdminClient);
when(client.admin()).thenReturn(adminClient);

IndexMappingTool.Factory.getInstance().init(client);

indexParams = Map.of("index", "[\"foo\"]");
otherParams = Map.of("other", "[\"bar\"]");
emptyParams = Collections.emptyMap();
}


@Test
public void testRunAsyncNoIndexParams() throws Exception {
Tool tool = IndexMappingTool.Factory.getInstance().create(Collections.emptyMap());
final CompletableFuture<String> future = new CompletableFuture<>();
ActionListener<String> listener = ActionListener.wrap(r -> { future.complete(r); }, e -> { future.completeExceptionally(e); });

tool.run(emptyParams, listener);

future.join();
assertEquals("There were no results searching the index parameter [null].", future.get());
}

@Test
public void testRunAsyncNoIndices() throws Exception {
Tool tool = IndexMappingTool.Factory.getInstance().create(Collections.emptyMap());
final CompletableFuture<String> future = new CompletableFuture<>();
ActionListener<String> listener = ActionListener.wrap(r -> { future.complete(r); }, e -> { future.completeExceptionally(e); });

tool.run(otherParams, listener);

future.join();
assertEquals("There were no results searching the index parameter [null].", future.get());
}

@Test
public void testRunAsyncNoResults() throws Exception {
@SuppressWarnings("unchecked")
ArgumentCaptor<ActionListener<GetIndexResponse>> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
doNothing().when(indicesAdminClient).getIndex(any(), actionListenerCaptor.capture());

Tool tool = IndexMappingTool.Factory.getInstance().create(Collections.emptyMap());
final CompletableFuture<String> future = new CompletableFuture<>();
ActionListener<String> listener = ActionListener.wrap(r -> { future.complete(r); }, e -> { future.completeExceptionally(e); });

when(getIndexResponse.indices()).thenReturn(Strings.EMPTY_ARRAY);

tool.run(indexParams, listener);
actionListenerCaptor.getValue().onResponse(getIndexResponse);

future.join();
assertEquals("There were no results searching the index parameter [[\"foo\"]].", future.get());
}

@Test
public void testRunAsyncIndexMapping() throws Exception {
String indexName = "foo";

@SuppressWarnings("unchecked")
ArgumentCaptor<ActionListener<GetIndexResponse>> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
doNothing().when(indicesAdminClient).getIndex(any(), actionListenerCaptor.capture());

when(getIndexResponse.indices()).thenReturn(new String[] { indexName });
Settings settings = Settings.builder().put("test.boolean.setting", false).put("test.int.setting", 123).build();
when(getIndexResponse.settings()).thenReturn(Map.of(indexName, settings));
String source = "{"
+ " \"foo\" : {"
+ " \"mappings\" : {"
+ " \"year\" : {"
+ " \"full_name\" : \"year\","
+ " \"mapping\" : {"
+ " \"year\" : {"
+ " \"type\" : \"text\""
+ " }"
+ " }"
+ " },"
+ " \"age\" : {"
+ " \"full_name\" : \"age\","
+ " \"mapping\" : {"
+ " \"age\" : {"
+ " \"type\" : \"integer\""
+ " }"
+ " }"
+ " }"
+ " }"
+ " }"
+ "}";
MappingMetadata mapping = new MappingMetadata(indexName, XContentHelper.convertToMap(JsonXContent.jsonXContent, source, true));
when(getIndexResponse.mappings()).thenReturn(Map.of(indexName, mapping));

// Now make the call
Tool tool = IndexMappingTool.Factory.getInstance().create(Collections.emptyMap());
final CompletableFuture<String> future = new CompletableFuture<>();
ActionListener<String> listener = ActionListener.wrap(r -> { future.complete(r); }, e -> { future.completeExceptionally(e); });

tool.run(indexParams, listener);
actionListenerCaptor.getValue().onResponse(getIndexResponse);

future.orTimeout(10, TimeUnit.SECONDS).join();
String response = future.get();
List<String> responseList = Arrays.asList(response.trim().split("\\n"));

assertTrue(responseList.contains("index: foo"));

assertTrue(responseList.contains("mappings:"));
assertTrue(
responseList.contains(
"mappings={year={full_name=year, mapping={year={type=text}}}, age={full_name=age, mapping={age={type=integer}}}}"
)
);

assertTrue(responseList.contains("settings:"));
assertTrue(responseList.contains("test.boolean.setting=false"));
assertTrue(responseList.contains("test.int.setting=123"));
}

@Test
public void testTool() {
Tool tool = IndexMappingTool.Factory.getInstance().create(Collections.emptyMap());
assertEquals(IndexMappingTool.NAME, tool.getName());
assertTrue(tool.validate(indexParams));
assertTrue(tool.validate(otherParams));
assertFalse(tool.validate(emptyParams));
}
}

0 comments on commit abac194

Please sign in to comment.