Skip to content

Commit

Permalink
Fixing ProvisionWorkflowTransportAction, handling case in which a doc…
Browse files Browse the repository at this point in the history
…ument does not exist (opensearch-project#100)

* Fixing bug, handling case in which a GC entry does not exist prior to attempting to parse from source

Signed-off-by: Joshua Palis <[email protected]>

* Catching topological sorting exceptions when executing the workflow

Signed-off-by: Joshua Palis <[email protected]>

---------

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis authored Oct 17, 2023
1 parent c84321f commit 63ef780
Showing 1 changed file with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
client.get(getRequest, ActionListener.wrap(response -> {
context.restore();

if (!response.isExists()) {
listener.onFailure(
new FlowFrameworkException(
"Failed to retrieve template (" + workflowId + ") from global context.",
RestStatus.NOT_FOUND
)
);
return;
}

// Parse template from document source
Template template = Template.parseFromDocumentSource(response.getSourceAsString());

Expand Down Expand Up @@ -131,33 +141,38 @@ private void executeWorkflowAsync(String workflowId, Workflow workflow) {
* @param workflowListener The listener that updates the status of a workflow execution
*/
private void executeWorkflow(Workflow workflow, ActionListener<String> workflowListener) {

List<ProcessNode> processSequence = workflowProcessSorter.sortProcessNodes(workflow);
List<CompletableFuture<?>> workflowFutureList = new ArrayList<>();

for (ProcessNode processNode : processSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

logger.info(
"Queueing process [{}].{}",
processNode.id(),
predecessors.isEmpty()
? " Can start immediately!"
: String.format(
Locale.getDefault(),
" Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);

workflowFutureList.add(processNode.execute());
}
try {

// Attempt to topologically sort the workflow graph
List<ProcessNode> processSequence = workflowProcessSorter.sortProcessNodes(workflow);
List<CompletableFuture<?>> workflowFutureList = new ArrayList<>();

for (ProcessNode processNode : processSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

logger.info(
"Queueing process [{}].{}",
processNode.id(),
predecessors.isEmpty()
? " Can start immediately!"
: String.format(
Locale.getDefault(),
" Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);

workflowFutureList.add(processNode.execute());
}

// Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally
workflowFutureList.forEach(CompletableFuture::join);

// TODO : Create State Index request with provisioning state, start time, end time, etc, pending implementation. String for now
workflowListener.onResponse("READY");

} catch (IllegalArgumentException e) {
workflowListener.onFailure(new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST));
} catch (CancellationException | CompletionException ex) {
workflowListener.onFailure(new FlowFrameworkException(ex.getMessage(), RestStatus.INTERNAL_SERVER_ERROR));
}
Expand Down

0 comments on commit 63ef780

Please sign in to comment.