Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Fixing ProvisionWorkflowTransportAction, handling case in which a document does not exist #102

Merged
merged 2 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ validateNebulaPom.enabled = false

buildscript {
ext {
opensearch_version = System.getProperty("opensearch.version", "2.11.0-SNAPSHOT")
opensearch_version = System.getProperty("opensearch.version", "2.12.0-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
version_tokens = opensearch_version.tokenize('-')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@
client.get(getRequest, ActionListener.wrap(response -> {
context.restore();

if (!response.isExists()) {
listener.onFailure(

Check warning on line 88 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L88

Added line #L88 was not covered by tests
new FlowFrameworkException(
"Failed to retrieve template (" + workflowId + ") from global context.",
RestStatus.NOT_FOUND
)
);
return;

Check warning on line 94 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L94

Added line #L94 was not covered by tests
}

// Parse template from document source
Template template = Template.parseFromDocumentSource(response.getSourceAsString());

Expand Down Expand Up @@ -131,33 +141,38 @@
* @param workflowListener The listener that updates the status of a workflow execution
*/
private void executeWorkflow(Workflow workflow, ActionListener<String> workflowListener) {

List<ProcessNode> processSequence = workflowProcessSorter.sortProcessNodes(workflow);
List<CompletableFuture<?>> workflowFutureList = new ArrayList<>();

for (ProcessNode processNode : processSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

logger.info(
"Queueing process [{}].{}",
processNode.id(),
predecessors.isEmpty()
? " Can start immediately!"
: String.format(
Locale.getDefault(),
" Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))
)
);

workflowFutureList.add(processNode.execute());
}
try {

// Attempt to topologically sort the workflow graph
List<ProcessNode> processSequence = workflowProcessSorter.sortProcessNodes(workflow);
List<CompletableFuture<?>> workflowFutureList = new ArrayList<>();

Check warning on line 148 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L147-L148

Added lines #L147 - L148 were not covered by tests

for (ProcessNode processNode : processSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

Check warning on line 151 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L151

Added line #L151 was not covered by tests

logger.info(

Check warning on line 153 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L153

Added line #L153 was not covered by tests
"Queueing process [{}].{}",
processNode.id(),

Check warning on line 155 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L155

Added line #L155 was not covered by tests
predecessors.isEmpty()
? " Can start immediately!"
: String.format(
Locale.getDefault(),

Check warning on line 159 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L157-L159

Added lines #L157 - L159 were not covered by tests
" Must wait for [%s] to complete first.",
predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", "))

Check warning on line 161 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L161

Added line #L161 was not covered by tests
)
);

workflowFutureList.add(processNode.execute());
}

Check warning on line 166 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L165-L166

Added lines #L165 - L166 were not covered by tests

// Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally
workflowFutureList.forEach(CompletableFuture::join);

// TODO : Create State Index request with provisioning state, start time, end time, etc, pending implementation. String for now
workflowListener.onResponse("READY");

} catch (IllegalArgumentException e) {
workflowListener.onFailure(new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST));

Check warning on line 175 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L174-L175

Added lines #L174 - L175 were not covered by tests
} catch (CancellationException | CompletionException ex) {
workflowListener.onFailure(new FlowFrameworkException(ex.getMessage(), RestStatus.INTERNAL_SERVER_ERROR));
}
Expand Down
Loading