Skip to content

Commit

Permalink
Introducing getExecutorBuilders extension point to FlowFramworkPlugin…
Browse files Browse the repository at this point in the history
…, added FixedExecutorBuilder thread pool for provisioning tasks, set up async workflow execution, added TODOs for state/GC index handling

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Oct 4, 2023
1 parent 0bceff6 commit cc32079
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

Expand All @@ -49,14 +51,23 @@
*/
public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

// TODO : Move names to common values class
/**
* The base URI for this plugin's rest actions
*/
public static final String AI_FLOW_FRAMEWORK_BASE_URI = "/_plugins/_ai_flow";
public static final String AI_FLOW_FRAMEWORK_BASE_URI = "/_plugins/_flow_framework";
/**
* The URI for this plugin's workflow rest actions
*/
public static final String WORKFLOWS_URI = AI_FLOW_FRAMEWORK_BASE_URI + "/workflows";
/**
* Flow Framework plugin thread pool name prefix
*/
public static final String FLOW_FRAMEWORK_THREAD_POOL_PREFIX = "thread_pool.flow_framework.";
/**
* The provision workflow thread pool name
*/
public static final String PROVISION_THREAD_POOL = "opensearch_workflow_provision";

/**
* Instantiate this plugin.
Expand Down Expand Up @@ -104,4 +115,18 @@ public List<RestHandler> getRestHandlers(
);
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
// TODO : Determine final size/queueSize values for the provision thread pool
FixedExecutorBuilder provisionThreadPool = new FixedExecutorBuilder(
settings,
PROVISION_THREAD_POOL,
1,
10,
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_THREAD_POOL,
false
);
return ImmutableList.of(provisionThreadPool);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,60 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.FlowFrameworkPlugin.PROVISION_THREAD_POOL;

/**
* Transport Action to provision a workflow from a stored use case template
*/
public class ProvisionWorkflowTransportAction extends HandledTransportAction<WorkflowRequest, WorkflowResponse> {

private final Logger logger = LogManager.getLogger(ProvisionWorkflowTransportAction.class);

// TODO : Move to common values class, pending implementation
/**
* The name of the provision workflow within the use case template
*/
private static final String PROVISION_WORKFLOW = "provision";

private final ThreadPool threadPool;
private final Client client;
private final WorkflowProcessSorter workflowProcessSorter;

/**
* Instantiates a new ProvisionWorkflowTransportAction
* @param transportService The TransportService
* @param actionFilters action filters
* @param threadPool The OpenSearch thread pool
* @param client The node client to retrieve a stored use case template
* @param workflowProcessSorter Utility class to generate a togologically sorted list of Process nodes
*/
@Inject
public ProvisionWorkflowTransportAction(TransportService transportService, ActionFilters actionFilters, Client client) {
public ProvisionWorkflowTransportAction(
TransportService transportService,
ActionFilters actionFilters,
ThreadPool threadPool,
Client client,
WorkflowProcessSorter workflowProcessSorter
) {
super(ProvisionWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.threadPool = threadPool;
this.client = client;
this.workflowProcessSorter = workflowProcessSorter;
}

@Override
Expand All @@ -47,22 +81,86 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
String workflowId = workflowResponse.getWorkflowId();
Template template = request.getTemplate();

// TODO : Use node client to update state index, given workflowId
// TODO : Pass workflowId and template to execution
// TODO : Use node client to update state index to PROVISIONING, given workflowId

listener.onResponse(new WorkflowResponse(workflowId));

}, exception -> { listener.onFailure(exception); }));
// Asychronously begin provision workflow excecution
executeWorkflowAsync(workflowId, template.workflows().get(PROVISION_WORKFLOW));

}, exception -> { listener.onFailure(exception); }));
} else {

// Use case template has been previously saved, retrieve entry and execute
String workflowId = request.getWorkflowId();

// TODO : use node client to update state index, given workflowId
// TODO : Retrieve template from global context index using node client execute
// TODO : Retrieve template from global context index using node client
Template template = null; // temporary, remove later

// TODO : use node client to update state index entry to PROVISIONING, given workflowId

listener.onResponse(new WorkflowResponse(workflowId));
executeWorkflowAsync(workflowId, template.workflows().get(PROVISION_WORKFLOW));
}
}

/**
* Retrieves a thread from the provision thread pool to execute a workflow
* @param workflowId The id of the workflow
* @param workflow The workflow to execute
*/
private void executeWorkflowAsync(String workflowId, Workflow workflow) {
// TODO : Update Action listener type to State index Request
ActionListener<String> provisionWorkflowListener = ActionListener.wrap(response -> {
logger.info("Provisioning completed successuflly for workflow {}", workflowId);

// TODO : Create State index request to update STATE entry status to READY
}, exception -> {
logger.error("Provisioning failed for workflow {} : {}", workflowId, exception);

// TODO : Create State index request to update STATE entry status to FAILED
});
try {
threadPool.executor(PROVISION_THREAD_POOL).execute(() -> { executeWorkflow(workflow, provisionWorkflowListener); });
} catch (Exception exception) {
provisionWorkflowListener.onFailure(exception);
}
}

/**
* Topologically sorts a given workflow into a sequence of ProcessNodes and executes the workflow
* @param workflow The workflow to execute
* @param workflowListener The listener that updates the status of a workflow execution
*/
private void executeWorkflow(Workflow workflow, ActionListener<String> workflowListener) {

List<ProcessNode> processSequence = workflowProcessSorter.sortProcessNodes(workflow);
List<CompletableFuture<?>> workflowFutureList = new ArrayList<>();

for (ProcessNode processNode : processSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

logger.info(
"Queueing process [{}].{}",
processNode.id(),
predecessors.isEmpty()
? " Can start immediately!"
: String.format(
Locale.getDefault(),
" Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);

workflowFutureList.add(processNode.execute());
}
try {
// Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally
workflowFutureList.forEach(CompletableFuture::join);

// TODO : Create State Index request with provisioning state, start time, end time, etc, pending implementation. String for now
workflowListener.onResponse("READY");
} catch (CancellationException | CompletionException ex) {
workflowListener.onFailure(ex);
}
}

Expand Down

0 comments on commit cc32079

Please sign in to comment.