Skip to content

Commit

Permalink
Integrate thread pool executor service
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Sep 26, 2023
1 parent 078059b commit 90e95e3
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 47 deletions.
10 changes: 8 additions & 2 deletions src/main/java/demo/DataDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.flowframework.template.ProcessNode;
import org.opensearch.flowframework.template.Template;
import org.opensearch.flowframework.template.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -23,6 +24,8 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
Expand All @@ -48,10 +51,13 @@ public static void main(String[] args) throws IOException {
logger.error("Failed to read JSON at path {}", path);
return;
}
WorkflowStepFactory factory = WorkflowStepFactory.create(null);
ExecutorService executor = Executors.newFixedThreadPool(10);
WorkflowProcessSorter.create(factory, executor);

logger.info("Parsing graph to sequence...");
Template t = Template.parse(json);
List<ProcessNode> processSequence = WorkflowProcessSorter.sortProcessNodes(t.workflows().get("datademo"));
List<ProcessNode> processSequence = WorkflowProcessSorter.get().sortProcessNodes(t.workflows().get("datademo"));
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Expand All @@ -71,6 +77,6 @@ public static void main(String[] args) throws IOException {
}
futureList.forEach(CompletableFuture::join);
logger.info("All done!");
executor.shutdown();
}

}
10 changes: 8 additions & 2 deletions src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.flowframework.template.ProcessNode;
import org.opensearch.flowframework.template.Template;
import org.opensearch.flowframework.template.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -23,6 +24,8 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
Expand All @@ -48,10 +51,13 @@ public static void main(String[] args) throws IOException {
logger.error("Failed to read JSON at path {}", path);
return;
}
WorkflowStepFactory factory = WorkflowStepFactory.create(null);
ExecutorService executor = Executors.newFixedThreadPool(10);
WorkflowProcessSorter.create(factory, executor);

logger.info("Parsing graph to sequence...");
Template t = Template.parse(json);
List<ProcessNode> processSequence = WorkflowProcessSorter.sortProcessNodes(t.workflows().get("demo"));
List<ProcessNode> processSequence = WorkflowProcessSorter.get().sortProcessNodes(t.workflows().get("demo"));
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Expand All @@ -71,6 +77,6 @@ public static void main(String[] args) throws IOException {
}
futureList.forEach(CompletableFuture::join);
logger.info("All done!");
executor.shutdown();
}

}
6 changes: 5 additions & 1 deletion src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import org.opensearch.flowframework.template.Template;
import org.opensearch.flowframework.template.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.Workflow;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Map.Entry;
import java.util.concurrent.Executors;

/**
* Demo class exercising {@link WorkflowProcessSorter}. This will be moved to a unit test.
Expand All @@ -44,6 +46,8 @@ public static void main(String[] args) throws IOException {
logger.error("Failed to read JSON at path {}", path);
return;
}
WorkflowStepFactory factory = WorkflowStepFactory.create(null);
WorkflowProcessSorter.create(factory, Executors.newFixedThreadPool(10));

Template t = Template.parse(json);

Expand All @@ -52,7 +56,7 @@ public static void main(String[] args) throws IOException {

for (Entry<String, Workflow> e : t.workflows().entrySet()) {
logger.info("Parsing {} workflow.", e.getKey());
WorkflowProcessSorter.sortProcessNodes(e.getValue());
WorkflowProcessSorter.get().sortProcessNodes(e.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
<<<<<<< HEAD
import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep;
import org.opensearch.flowframework.workflow.CreateIngestPipelineStep;
=======
import org.opensearch.flowframework.template.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
>>>>>>> abffd2d (Integrate thread pool executor service)
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand All @@ -32,8 +37,6 @@
*/
public class FlowFrameworkPlugin extends Plugin {

private Client client;

@Override
public Collection<Object> createComponents(
Client client,
Expand All @@ -48,9 +51,9 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.client = client;
CreateIngestPipelineStep createIngestPipelineStep = new CreateIngestPipelineStep(client);
CreateIndexStep createIndexStep = new CreateIndexStep(client);
return ImmutableList.of(createIngestPipelineStep, createIndexStep);
WorkflowStepFactory workflowStepFactory = WorkflowStepFactory.create(client);
WorkflowProcessSorter workflowProcessSorter = WorkflowProcessSorter.create(workflowStepFactory, threadPool.generic());

return ImmutableList.of(workflowStepFactory, workflowProcessSorter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand All @@ -31,6 +32,8 @@ public class ProcessNode {
private final WorkflowStep workflowStep;
private final WorkflowData input;
private final List<ProcessNode> predecessors;
private Executor executor;

private final CompletableFuture<WorkflowData> future = new CompletableFuture<>();

/**
Expand All @@ -40,12 +43,14 @@ public class ProcessNode {
* @param workflowStep A java class implementing {@link WorkflowStep} to be executed when it's this node's turn.
* @param input Input required by the node encoded in a {@link WorkflowData} instance.
* @param predecessors Nodes preceding this one in the workflow
* @param executor The OpenSearch thread pool
*/
public ProcessNode(String id, WorkflowStep workflowStep, WorkflowData input, List<ProcessNode> predecessors) {
public ProcessNode(String id, WorkflowStep workflowStep, WorkflowData input, List<ProcessNode> predecessors, Executor executor) {
this.id = id;
this.workflowStep = workflowStep;
this.input = input;
this.predecessors = predecessors;
this.executor = executor;
}

/**
Expand Down Expand Up @@ -135,7 +140,7 @@ public CompletableFuture<WorkflowData> execute() {
} catch (InterruptedException | ExecutionException e) {
handleException(e);
}
});
}, executor);
return this.future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -34,23 +35,55 @@ public class WorkflowProcessSorter {

private static final Logger logger = LogManager.getLogger(WorkflowProcessSorter.class);

private static WorkflowProcessSorter instance = null;

private WorkflowStepFactory workflowStepFactory;
private Executor executor;

/**
* Prevent instantiating this class.
* Create the singleton instance of this class. Throws an {@link IllegalStateException} if already created.
*
* @param workflowStepFactory The singleton instance of {@link WorkflowStepFactory}
* @param executor A thread executor
* @return The created instance
*/
private WorkflowProcessSorter() {}
public static synchronized WorkflowProcessSorter create(WorkflowStepFactory workflowStepFactory, Executor executor) {
if (instance != null) {
throw new IllegalStateException("This class was already created.");
}
instance = new WorkflowProcessSorter(workflowStepFactory, executor);
return instance;
}

/**
* Gets the singleton instance of this class. Throws an {@link IllegalStateException} if not yet created.
*
* @return The created instance
*/
public static synchronized WorkflowProcessSorter get() {
if (instance == null) {
throw new IllegalStateException("This factory has not yet been created.");
}
return instance;
}

private WorkflowProcessSorter(WorkflowStepFactory workflowStepFactory, Executor executor) {
this.workflowStepFactory = workflowStepFactory;
this.executor = executor;
}

/**
* Sort a workflow into a topologically sorted list of process nodes.
* @param workflow A workflow with (unsorted) nodes and edges which define predecessors and successors
* @return A list of Process Nodes sorted topologically. All predecessors of any node will occur prior to it in the list.
*/
public static List<ProcessNode> sortProcessNodes(Workflow workflow) {
public List<ProcessNode> sortProcessNodes(Workflow workflow) {
List<WorkflowNode> sortedNodes = topologicalSort(workflow.nodes(), workflow.edges());

List<ProcessNode> nodes = new ArrayList<>();
Map<String, ProcessNode> idToNodeMap = new HashMap<>();
for (WorkflowNode node : sortedNodes) {
WorkflowStep step = WorkflowStepFactory.get().createStep(node.type());
WorkflowStep step = workflowStepFactory.createStep(node.type());
WorkflowData data = new WorkflowData(node.inputs(), workflow.userParams());
List<ProcessNode> predecessorNodes = workflow.edges()
.stream()
Expand All @@ -59,7 +92,7 @@ public static List<ProcessNode> sortProcessNodes(Workflow workflow) {
.map(e -> idToNodeMap.get(e.source()))
.collect(Collectors.toList());

ProcessNode processNode = new ProcessNode(node.id(), step, data, predecessorNodes);
ProcessNode processNode = new ProcessNode(node.id(), step, data, predecessorNodes, executor);
idToNodeMap.put(processNode.id(), processNode);
nodes.add(processNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

import java.io.IOException;
import java.net.URL;
Expand All @@ -34,7 +35,9 @@ public class CreateIndexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(CreateIndexStep.class);
private Client client;
private final String NAME = "create_index_step";

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "create_index_step";

/**
* Instantiate this class
Expand Down Expand Up @@ -79,6 +82,7 @@ public void onFailure(Exception e) {
// 1. Create settings based on the index settings received from content

try {
// TODO: mapping() is deprecated
CreateIndexRequest request = new CreateIndexRequest(index).mapping(
getIndexMappings("mappings/" + type + ".json"),
XContentType.JSON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,60 @@
*/
package org.opensearch.flowframework.workflow;

import org.opensearch.client.Client;
import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import demo.CreateIndexWorkflowStep;
import demo.DemoWorkflowStep;

/**
* Generates instances implementing {@link WorkflowStep}.
*/
public class WorkflowStepFactory {

private static final WorkflowStepFactory INSTANCE = new WorkflowStepFactory();
private static WorkflowStepFactory instance = null;

private Client client;
private final Map<String, WorkflowStep> stepMap = new HashMap<>();

private WorkflowStepFactory() {
populateMap();
/**
* Create the singleton instance of this class. Throws an {@link IllegalStateException} if already created.
*
* @param client The OpenSearch client steps can use
* @return The created instance
*/
public static synchronized WorkflowStepFactory create(Client client) {
if (instance != null) {
throw new IllegalStateException("This factory was already created.");
}
instance = new WorkflowStepFactory(client);
return instance;
}

/**
* Gets the singleton instance of this class
* @return The instance of this class
* Gets the singleton instance of this class. Throws an {@link IllegalStateException} if not yet created.
*
* @return The created instance
*/
public static WorkflowStepFactory get() {
return INSTANCE;
public static synchronized WorkflowStepFactory get() {
if (instance == null) {
throw new IllegalStateException("This factory has not yet been created.");
}
return instance;
}

private WorkflowStepFactory(Client client) {
this.client = client;
populateMap();
}

private void populateMap() {
stepMap.put(CreateIndexStep.NAME, new CreateIndexStep(client));

// TODO: These are from the demo class as placeholders
// Replace with actual implementations such as
// https://github.com/opensearch-project/opensearch-ai-flow-framework/pull/38
Expand Down Expand Up @@ -69,13 +93,8 @@ public String getName() {
* @return an instance of the specified type
*/
public WorkflowStep createStep(String type) {
switch (type) {
case "create_index":
return new CreateIndexWorkflowStep();
default:
if (stepMap.containsKey(type)) {
return stepMap.get(type);
}
if (stepMap.containsKey(type)) {
return stepMap.get(type);
}
// TODO: replace this with a FlowFrameworkException
// https://github.com/opensearch-project/opensearch-ai-flow-framework/pull/43
Expand Down
Loading

0 comments on commit 90e95e3

Please sign in to comment.