Skip to content

Commit

Permalink
Add timeout for node execution (#66)
Browse files Browse the repository at this point in the history
* Add timeout for node execution

Signed-off-by: Daniel Widdis <[email protected]>

* Properly implement delays using OpenSearch ThreadPool

Signed-off-by: Daniel Widdis <[email protected]>

* Add coverage for plugin class, make test threshold dynamic

Signed-off-by: Daniel Widdis <[email protected]>

* Tests don't like singletons

Signed-off-by: Daniel Widdis <[email protected]>

* More thorough ProcessNode testing

Signed-off-by: Daniel Widdis <[email protected]>

* Util method for timeout parsing

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Oct 3, 2023
1 parent 0ed717d commit 28326bd
Show file tree
Hide file tree
Showing 14 changed files with 371 additions and 154 deletions.
9 changes: 6 additions & 3 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
codecov:
require_ci_to_pass: yes
require_ci_to_pass: true

# ignore files in demo package
ignore:
Expand All @@ -12,5 +12,8 @@ coverage:
status:
project:
default:
target: 70% # the required coverage value
threshold: 1% # the leniency in hitting the target
target: auto
threshold: 2% # project coverage can drop
patch:
default:
target: 70% # required diff coverage value
18 changes: 11 additions & 7 deletions src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -26,8 +28,7 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand All @@ -37,6 +38,8 @@ public class Demo {

private static final Logger logger = LogManager.getLogger(Demo.class);

private Demo() {}

/**
* Demonstrate parsing a JSON graph.
*
Expand All @@ -54,13 +57,14 @@ public static void main(String[] args) throws IOException {
return;
}
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
ExecutorService executor = Executors.newFixedThreadPool(10);
WorkflowProcessSorter.create(factory, executor);
WorkflowStepFactory factory = new WorkflowStepFactory(client);

ThreadPool threadPool = new ThreadPool(Settings.EMPTY);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(factory, threadPool);

logger.info("Parsing graph to sequence...");
Template t = Template.parse(json);
List<ProcessNode> processSequence = WorkflowProcessSorter.get().sortProcessNodes(t.workflows().get("demo"));
List<ProcessNode> processSequence = workflowProcessSorter.sortProcessNodes(t.workflows().get("demo"));
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Expand All @@ -80,6 +84,6 @@ public static void main(String[] args) throws IOException {
}
futureList.forEach(CompletableFuture::join);
logger.info("All done!");
executor.shutdown();
ThreadPool.terminate(threadPool, 500, TimeUnit.MILLISECONDS);
}
}
14 changes: 10 additions & 4 deletions src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Demo class exercising {@link WorkflowProcessSorter}. This will be moved to a unit test.
Expand All @@ -32,6 +34,8 @@ public class TemplateParseDemo {

private static final Logger logger = LogManager.getLogger(TemplateParseDemo.class);

private TemplateParseDemo() {}

/**
* Demonstrate parsing a JSON graph.
*
Expand All @@ -49,8 +53,9 @@ public static void main(String[] args) throws IOException {
return;
}
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
WorkflowProcessSorter.create(factory, Executors.newFixedThreadPool(10));
WorkflowStepFactory factory = new WorkflowStepFactory(client);
ThreadPool threadPool = new ThreadPool(Settings.EMPTY);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(factory, threadPool);

Template t = Template.parse(json);

Expand All @@ -59,7 +64,8 @@ public static void main(String[] args) throws IOException {

for (Entry<String, Workflow> e : t.workflows().entrySet()) {
logger.info("Parsing {} workflow.", e.getKey());
WorkflowProcessSorter.get().sortProcessNodes(e.getValue());
workflowProcessSorter.sortProcessNodes(e.getValue());
}
ThreadPool.terminate(threadPool, 500, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
*/
public class FlowFrameworkPlugin extends Plugin {

/**
* Instantiate this plugin.
*/
public FlowFrameworkPlugin() {}

@Override
public Collection<Object> createComponents(
Client client,
Expand All @@ -46,8 +51,8 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
WorkflowStepFactory workflowStepFactory = WorkflowStepFactory.create(client);
WorkflowProcessSorter workflowProcessSorter = WorkflowProcessSorter.create(workflowStepFactory, threadPool.generic());
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(client);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

return ImmutableList.of(workflowStepFactory, workflowProcessSorter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public class WorkflowNode implements ToXContentObject {
public static final String INPUTS_FIELD = "inputs";
/** The field defining processors in the inputs for search and ingest pipelines */
public static final String PROCESSORS_FIELD = "processors";
/** The field defining the timeout value for this node */
public static final String NODE_TIMEOUT_FIELD = "node_timeout";
/** The default timeout value if the template doesn't override it */
public static final String NODE_TIMEOUT_DEFAULT_VALUE = "10s";

private final String id; // unique id
private final String type; // maps to a WorkflowStep
Expand Down
115 changes: 68 additions & 47 deletions src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.Scheduler.ScheduledCancellable;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/**
* Representation of a process node in a workflow graph. Tracks predecessor nodes which must be completed before it can start execution.
* Representation of a process node in a workflow graph.
* Tracks predecessor nodes which must be completed before it can start execution.
*/
public class ProcessNode {

Expand All @@ -30,7 +32,8 @@ public class ProcessNode {
private final WorkflowStep workflowStep;
private final WorkflowData input;
private final List<ProcessNode> predecessors;
private Executor executor;
private final ThreadPool threadPool;
private final TimeValue nodeTimeout;

private final CompletableFuture<WorkflowData> future = new CompletableFuture<>();

Expand All @@ -41,14 +44,23 @@ public class ProcessNode {
* @param workflowStep A java class implementing {@link WorkflowStep} to be executed when it's this node's turn.
* @param input Input required by the node encoded in a {@link WorkflowData} instance.
* @param predecessors Nodes preceding this one in the workflow
* @param executor The OpenSearch thread pool
* @param threadPool The OpenSearch thread pool
* @param nodeTimeout The timeout value for executing on this node
*/
public ProcessNode(String id, WorkflowStep workflowStep, WorkflowData input, List<ProcessNode> predecessors, Executor executor) {
public ProcessNode(
String id,
WorkflowStep workflowStep,
WorkflowData input,
List<ProcessNode> predecessors,
ThreadPool threadPool,
TimeValue nodeTimeout
) {
this.id = id;
this.workflowStep = workflowStep;
this.input = input;
this.predecessors = predecessors;
this.executor = executor;
this.threadPool = threadPool;
this.nodeTimeout = nodeTimeout;
}

/**
Expand Down Expand Up @@ -90,64 +102,73 @@ public CompletableFuture<WorkflowData> future() {
* Returns the predecessors of this node in the workflow.
* The predecessor's {@link #future()} must complete before execution begins on this node.
*
* @return a set of predecessor nodes, if any. At least one node in the graph must have no predecessors and serve as a start node.
* @return a set of predecessor nodes, if any.
* At least one node in the graph must have no predecessors and serve as a start node.
*/
public List<ProcessNode> predecessors() {
return predecessors;
}

/**
* Execute this node in the sequence. Initializes the node's {@link CompletableFuture} and completes it when the process completes.
* Returns the timeout value of this node in the workflow. A value of {@link TimeValue#ZERO} means no timeout.
* @return The node's timeout value.
*/
public TimeValue nodeTimeout() {
return nodeTimeout;
}

/**
* Execute this node in the sequence.
* Initializes the node's {@link CompletableFuture} and completes it when the process completes.
*
* @return this node's future. This is returned immediately, while process execution continues asynchronously.
* @return this node's future.
* This is returned immediately, while process execution continues asynchronously.
*/
public CompletableFuture<WorkflowData> execute() {
// TODO this class will be instantiated with the OpenSearch thread pool (or one for tests!)
// the generic executor from that pool should be passed to this runAsync call
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42
if (this.future.isDone()) {
throw new IllegalStateException("Process Node [" + this.id + "] already executed.");
}
CompletableFuture.runAsync(() -> {
List<CompletableFuture<WorkflowData>> predFutures = predecessors.stream().map(p -> p.future()).collect(Collectors.toList());
if (!predecessors.isEmpty()) {
CompletableFuture<Void> waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture<?>[0]));
try {
// We need timeouts to be part of the user template or in settings
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/45
waitForPredecessors.orTimeout(30, TimeUnit.SECONDS).get();
} catch (InterruptedException | ExecutionException e) {
handleException(e);
return;
try {
if (!predecessors.isEmpty()) {
CompletableFuture<Void> waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture<?>[0]));
waitForPredecessors.join();
}
}
logger.info(">>> Starting {}.", this.id);
// get the input data from predecessor(s)
List<WorkflowData> input = new ArrayList<WorkflowData>();
input.add(this.input);
for (CompletableFuture<WorkflowData> cf : predFutures) {
try {

logger.info("Starting {}.", this.id);
// get the input data from predecessor(s)
List<WorkflowData> input = new ArrayList<WorkflowData>();
input.add(this.input);
for (CompletableFuture<WorkflowData> cf : predFutures) {
input.add(cf.get());
} catch (InterruptedException | ExecutionException e) {
handleException(e);
return;
}
}
CompletableFuture<WorkflowData> stepFuture = this.workflowStep.execute(input);
try {
stepFuture.orTimeout(15, TimeUnit.SECONDS).join();
logger.info(">>> Finished {}.", this.id);

ScheduledCancellable delayExec = null;
if (this.nodeTimeout.compareTo(TimeValue.ZERO) > 0) {
delayExec = threadPool.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(new TimeoutException("Execute timed out for " + this.id));
}
}, this.nodeTimeout, ThreadPool.Names.SAME);
}
CompletableFuture<WorkflowData> stepFuture = this.workflowStep.execute(input);
// If completed exceptionally, this is a no-op
future.complete(stepFuture.get());
} catch (InterruptedException | ExecutionException e) {
handleException(e);
if (delayExec != null) {
delayExec.cancel();
}
logger.info("Finished {}.", this.id);
} catch (Throwable e) {
// TODO: better handling of getCause
this.future.completeExceptionally(e);
}
}, executor);
// TODO: improve use of thread pool beyond generic
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/61
}, threadPool.generic());
return this.future;
}

private void handleException(Exception e) {
// TODO: better handling of getCause
this.future.completeExceptionally(e);
logger.debug("<<< Completed Exceptionally {}", this.id, e.getCause());
}

@Override
public String toString() {
return this.id;
Expand Down
Loading

0 comments on commit 28326bd

Please sign in to comment.