Skip to content

Commit

Permalink
Integrated WorkflowData and made the request async
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Sep 19, 2023
1 parent b8869ef commit f759bea
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.workflow.CreateIndexStep;
import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;
package org.opensearch.flowframework.workflow.CreateIndex;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
Expand All @@ -17,9 +17,13 @@
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.client.Client;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;

import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class CreateIndexStep implements WorkflowStep {
Expand All @@ -33,26 +37,43 @@ public CreateIndexStep(Client client) {
}

@Override
public CompletableFuture<WorkflowData> execute(WorkflowData data) {

public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
CompletableFuture<WorkflowData> future = new CompletableFuture<>();
ActionListener<CreateIndexResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
logger.info("created index:{}");
future.complete(new WorkflowData() {
@Override
public Map<String, Object> getContent() {
return Map.of("index", createIndexResponse.index());
}
});
}

@Override
public void onFailure(Exception e) {
logger.error("Index creation failed", e);
future.completeExceptionally(e);
}
};

// Fetch indexName, fileName and settings from WorkflowData
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(getIndexMappings(fileName), XContentType.JSON)
String index = null;

for (WorkflowData workflowData : data) {
// Fetch index from content i.e. request body of execute API
Map<String, Object> content = workflowData.getContent();
index = (String) content.get("index");
}

// TODO:
// 1. Map index type -> fileName
// 2. Create settings based on the index settings received from content
CreateIndexRequest request = new CreateIndexRequest(index).mapping(getIndexMappings(fileName), XContentType.JSON)
.settings(settings);
client.admin().indices().create(request, actionListener);
return null;
return future;
}

@Override
Expand Down

0 comments on commit f759bea

Please sign in to comment.