diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 3c8b15f49..a5b0cea13 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -37,6 +37,8 @@ import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ExecutorBuilder; +import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; @@ -49,14 +51,23 @@ */ public class FlowFrameworkPlugin extends Plugin implements ActionPlugin { + // TODO : Move names to common values class /** * The base URI for this plugin's rest actions */ - public static final String AI_FLOW_FRAMEWORK_BASE_URI = "/_plugins/_ai_flow"; + public static final String AI_FLOW_FRAMEWORK_BASE_URI = "/_plugins/_flow_framework"; /** * The URI for this plugin's workflow rest actions */ public static final String WORKFLOWS_URI = AI_FLOW_FRAMEWORK_BASE_URI + "/workflows"; + /** + * Flow Framework plugin thread pool name prefix + */ + public static final String FLOW_FRAMEWORK_THREAD_POOL_PREFIX = "thread_pool.flow_framework."; + /** + * The provision workflow thread pool name + */ + public static final String PROVISION_THREAD_POOL = "opensearch_workflow_provision"; /** * Instantiate this plugin. @@ -104,4 +115,18 @@ public List getRestHandlers( ); } + @Override + public List> getExecutorBuilders(Settings settings) { + // TODO : Determine final size/queueSize values for the provision thread pool + FixedExecutorBuilder provisionThreadPool = new FixedExecutorBuilder( + settings, + PROVISION_THREAD_POOL, + 1, + 10, + FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_THREAD_POOL, + false + ); + return ImmutableList.of(provisionThreadPool); + } + } diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 855905af2..bc30cae3a 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -16,26 +16,60 @@ import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.model.Template; +import org.opensearch.flowframework.model.Workflow; +import org.opensearch.flowframework.workflow.ProcessNode; +import org.opensearch.flowframework.workflow.WorkflowProcessSorter; import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +import static org.opensearch.flowframework.FlowFrameworkPlugin.PROVISION_THREAD_POOL; + /** * Transport Action to provision a workflow from a stored use case template */ public class ProvisionWorkflowTransportAction extends HandledTransportAction { + private final Logger logger = LogManager.getLogger(ProvisionWorkflowTransportAction.class); + + // TODO : Move to common values class, pending implementation + /** + * The name of the provision workflow within the use case template + */ + private static final String PROVISION_WORKFLOW = "provision"; + + private final ThreadPool threadPool; private final Client client; + private final WorkflowProcessSorter workflowProcessSorter; /** * Instantiates a new ProvisionWorkflowTransportAction * @param transportService The TransportService * @param actionFilters action filters + * @param threadPool The OpenSearch thread pool * @param client The node client to retrieve a stored use case template + * @param workflowProcessSorter Utility class to generate a togologically sorted list of Process nodes */ @Inject - public ProvisionWorkflowTransportAction(TransportService transportService, ActionFilters actionFilters, Client client) { + public ProvisionWorkflowTransportAction( + TransportService transportService, + ActionFilters actionFilters, + ThreadPool threadPool, + Client client, + WorkflowProcessSorter workflowProcessSorter + ) { super(ProvisionWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new); + this.threadPool = threadPool; this.client = client; + this.workflowProcessSorter = workflowProcessSorter; } @Override @@ -47,22 +81,86 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { listener.onFailure(exception); })); + // Asychronously begin provision workflow excecution + executeWorkflowAsync(workflowId, template.workflows().get(PROVISION_WORKFLOW)); + }, exception -> { listener.onFailure(exception); })); } else { - // Use case template has been previously saved, retrieve entry and execute String workflowId = request.getWorkflowId(); - // TODO : use node client to update state index, given workflowId - // TODO : Retrieve template from global context index using node client execute + // TODO : Retrieve template from global context index using node client + Template template = null; // temporary, remove later + + // TODO : use node client to update state index entry to PROVISIONING, given workflowId listener.onResponse(new WorkflowResponse(workflowId)); + executeWorkflowAsync(workflowId, template.workflows().get(PROVISION_WORKFLOW)); + } + } + + /** + * Retrieves a thread from the provision thread pool to execute a workflow + * @param workflowId The id of the workflow + * @param workflow The workflow to execute + */ + private void executeWorkflowAsync(String workflowId, Workflow workflow) { + // TODO : Update Action listener type to State index Request + ActionListener provisionWorkflowListener = ActionListener.wrap(response -> { + logger.info("Provisioning completed successuflly for workflow {}", workflowId); + + // TODO : Create State index request to update STATE entry status to READY + }, exception -> { + logger.error("Provisioning failed for workflow {} : {}", workflowId, exception); + + // TODO : Create State index request to update STATE entry status to FAILED + }); + try { + threadPool.executor(PROVISION_THREAD_POOL).execute(() -> { executeWorkflow(workflow, provisionWorkflowListener); }); + } catch (Exception exception) { + provisionWorkflowListener.onFailure(exception); + } + } + + /** + * Topologically sorts a given workflow into a sequence of ProcessNodes and executes the workflow + * @param workflow The workflow to execute + * @param workflowListener The listener that updates the status of a workflow execution + */ + private void executeWorkflow(Workflow workflow, ActionListener workflowListener) { + + List processSequence = workflowProcessSorter.sortProcessNodes(workflow); + List> workflowFutureList = new ArrayList<>(); + + for (ProcessNode processNode : processSequence) { + List predecessors = processNode.predecessors(); + + logger.info( + "Queueing process [{}].{}", + processNode.id(), + predecessors.isEmpty() + ? " Can start immediately!" + : String.format( + Locale.getDefault(), + " Must wait for [%s] to complete first.", + predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) + ) + ); + + workflowFutureList.add(processNode.execute()); + } + try { + // Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally + workflowFutureList.forEach(CompletableFuture::join); + + // TODO : Create State Index request with provisioning state, start time, end time, etc, pending implementation. String for now + workflowListener.onResponse("READY"); + } catch (CancellationException | CompletionException ex) { + workflowListener.onFailure(ex); } }