From b8869ef10c1de0204ce46935b3b7f945655a6186 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Wed, 13 Sep 2023 15:50:49 -0700 Subject: [PATCH] Adds CreateIndex building block Signed-off-by: Owais Kazi --- .../flowframework/FlowFrameworkPlugin.java | 38 +++++++++++++++++- .../workflow/CreateIndexStep.java | 40 ++++++++++++++++--- .../resources/mappings/knn-index-mapping.json | 17 +++++++- 3 files changed, 87 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index f810767eb..09cae4940 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -8,11 +8,47 @@ */ package org.opensearch.flowframework; +import com.google.common.collect.ImmutableList; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.flowframework.workflow.CreateIndexStep; import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.function.Supplier; /** * An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch. */ public class FlowFrameworkPlugin extends Plugin { - // Implement the relevant Plugin Interfaces here + + private Client client; + + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier + ) { + this.client = client; + CreateIndexStep createIndexStep = new CreateIndexStep(client); + return ImmutableList.of(createIndexStep); + } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java index 3786313fb..9ebd9aade 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java @@ -10,26 +10,54 @@ import com.google.common.base.Charsets; import com.google.common.io.Resources; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.client.AdminClient; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.client.Client; import org.opensearch.common.xcontent.XContentType; import java.io.IOException; import java.net.URL; import java.util.concurrent.CompletableFuture; -public class CreateIndexStep implements Workflow { +public class CreateIndexStep implements WorkflowStep { - AdminClient adminClient; + private static final Logger logger = LogManager.getLogger(CreateIndexStep.class); + private Client client; + private final String CREATE_INDEX_STEP = "create_index_step"; + + public CreateIndexStep(Client client) { + this.client = client; + } @Override - public CompletableFuture execute() throws Exception { + public CompletableFuture execute(WorkflowData data) { + + ActionListener actionListener = new ActionListener<>() { + + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + logger.info("created index:{}"); + } - // ActionListener actionListener + @Override + public void onFailure(Exception e) { + logger.error("Index creation failed", e); + } + }; + + // Fetch indexName, fileName and settings from WorkflowData CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(getIndexMappings(fileName), XContentType.JSON) .settings(settings); - adminClient.indices().create(request, actionListener); + client.admin().indices().create(request, actionListener); + return null; + } + @Override + public String getName() { + return CREATE_INDEX_STEP; } /** diff --git a/src/main/resources/mappings/knn-index-mapping.json b/src/main/resources/mappings/knn-index-mapping.json index 8b1378917..c31946e62 100644 --- a/src/main/resources/mappings/knn-index-mapping.json +++ b/src/main/resources/mappings/knn-index-mapping.json @@ -1 +1,16 @@ - +{ + "properties": { + "desc_v": { + "type": "keyword" + }, + "name_v": { + "type": "keyword" + }, + "description": { + "type": "keyword" + }, + "name": { + "type": "keyword" + } + } +}