Skip to content

Commit

Permalink
[Backport 2.x] Fixing ProvisionWorkflowTransportAction, handling case…
Browse files Browse the repository at this point in the history
… in which a document does not exist (#102)

* Fixing ProvisionWorkflowTransportAction, handling case in which a document does not exist (#100)

* Fixing bug, handling case in which a GC entry does not exist prior to attempting to parse from source

Signed-off-by: Joshua Palis <[email protected]>

* Catching topological sorting exceptions when executing the workflow

Signed-off-by: Joshua Palis <[email protected]>

---------

Signed-off-by: Joshua Palis <[email protected]>
(cherry picked from commit 63ef780)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Signed-off-by: Joshua Palis <[email protected]>

* bumping version to 2.12

Signed-off-by: Joshua Palis <[email protected]>

---------

Signed-off-by: Joshua Palis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Joshua Palis <[email protected]>
  • Loading branch information
3 people authored Oct 17, 2023
1 parent 19c8935 commit a653645
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 22 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ validateNebulaPom.enabled = false

buildscript {
ext {
opensearch_version = System.getProperty("opensearch.version", "2.11.0-SNAPSHOT")
opensearch_version = System.getProperty("opensearch.version", "2.12.0-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
version_tokens = opensearch_version.tokenize('-')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
client.get(getRequest, ActionListener.wrap(response -> {
context.restore();

if (!response.isExists()) {
listener.onFailure(
new FlowFrameworkException(
"Failed to retrieve template (" + workflowId + ") from global context.",
RestStatus.NOT_FOUND
)
);
return;
}

// Parse template from document source
Template template = Template.parseFromDocumentSource(response.getSourceAsString());

Expand Down Expand Up @@ -131,33 +141,38 @@ private void executeWorkflowAsync(String workflowId, Workflow workflow) {
* @param workflowListener The listener that updates the status of a workflow execution
*/
private void executeWorkflow(Workflow workflow, ActionListener<String> workflowListener) {

List<ProcessNode> processSequence = workflowProcessSorter.sortProcessNodes(workflow);
List<CompletableFuture<?>> workflowFutureList = new ArrayList<>();

for (ProcessNode processNode : processSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

logger.info(
"Queueing process [{}].{}",
processNode.id(),
predecessors.isEmpty()
? " Can start immediately!"
: String.format(
Locale.getDefault(),
" Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);

workflowFutureList.add(processNode.execute());
}
try {

// Attempt to topologically sort the workflow graph
List<ProcessNode> processSequence = workflowProcessSorter.sortProcessNodes(workflow);
List<CompletableFuture<?>> workflowFutureList = new ArrayList<>();

for (ProcessNode processNode : processSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

logger.info(
"Queueing process [{}].{}",
processNode.id(),
predecessors.isEmpty()
? " Can start immediately!"
: String.format(
Locale.getDefault(),
" Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);

workflowFutureList.add(processNode.execute());
}

// Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally
workflowFutureList.forEach(CompletableFuture::join);

// TODO : Create State Index request with provisioning state, start time, end time, etc, pending implementation. String for now
workflowListener.onResponse("READY");

} catch (IllegalArgumentException e) {
workflowListener.onFailure(new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST));
} catch (CancellationException | CompletionException ex) {
workflowListener.onFailure(new FlowFrameworkException(ex.getMessage(), RestStatus.INTERNAL_SERVER_ERROR));
}
Expand Down

0 comments on commit a653645

Please sign in to comment.