Skip to content

Commit

Permalink
Addressed PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Sep 19, 2023
1 parent 44dc4a4 commit b76e7f7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
import com.google.common.io.Resources;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;

Expand All @@ -26,12 +27,19 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Step to create an index
*/
public class CreateIndexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(CreateIndexStep.class);
private Client client;
private final String CREATE_INDEX_STEP = "create_index_step";
private final String NAME = "create_index_step";

/**
* Instantiate this class
* @param client Client to create an index
*/
public CreateIndexStep(Client client) {
this.client = client;
}
Expand All @@ -43,7 +51,7 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {

@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
logger.info("created index:{}");
logger.info("created index:{}", createIndexResponse.index());
future.complete(new WorkflowData() {
@Override
public Map<String, Object> getContent() {
Expand All @@ -54,41 +62,54 @@ public Map<String, Object> getContent() {

@Override
public void onFailure(Exception e) {
logger.error("Index creation failed", e);
logger.error("Failed to create an index", e);
future.completeExceptionally(e);
}
};

String index = null;
String type = null;
Settings settings = null;

for (WorkflowData workflowData : data) {
// Fetch index from content i.e. request body of execute API
Map<String, Object> content = workflowData.getContent();
index = (String) content.get("index");
type = (String) content.get("type");
settings = (Settings) content.get("settings");
if (index != null && type != null) {
break;
}
}

// TODO:
// 1. Map index type -> fileName
// 2. Create settings based on the index settings received from content
CreateIndexRequest request = new CreateIndexRequest(index).mapping(getIndexMappings(fileName), XContentType.JSON)
.settings(settings);
client.admin().indices().create(request, actionListener);
// 1. Create settings based on the index settings received from content

try {
CreateIndexRequest request = new CreateIndexRequest(index).mapping(getIndexMappings(type), XContentType.JSON)
.settings(settings);
client.admin().indices().create(request, actionListener);
} catch (Exception e) {
logger.error("Failed to find the right mapping for the index", e);
}

return future;
}

@Override
public String getName() {
return CREATE_INDEX_STEP;
return NAME;
}

/**
* Get index mapping json content.
*
* @param mapping type of the index to fetch the specific mapping file
* @return index mapping
* @throws IOException IOException if mapping file can't be read correctly
*/
public static String getIndexMappings(String mappingFileName) throws IOException {
URL url = CreateIndexStep.class.getClassLoader().getResource(mappingFileName);
public static String getIndexMappings(String mapping) throws IOException {
URL url = CreateIndexStep.class.getClassLoader().getResource(mapping);
return Resources.toString(url, Charsets.UTF_8);
}
}
File renamed without changes.

0 comments on commit b76e7f7

Please sign in to comment.