From f60d0459dbfbb419e398d1bfd7b100eec4caddec Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 17 Oct 2023 17:30:35 +0000 Subject: [PATCH 1/2] Fixing ProvisionWorkflowTransportAction, handling case in which a document does not exist (#100) * Fixing bug, handling case in which a GC entry does not exist prior to attempting to parse from source Signed-off-by: Joshua Palis * Catching topological sorting exceptions when executing the workflow Signed-off-by: Joshua Palis --------- Signed-off-by: Joshua Palis (cherry picked from commit 63ef780ba6bd9edd6286b38020e2a34956ab5c1f) Signed-off-by: github-actions[bot] Signed-off-by: Joshua Palis --- .../ProvisionWorkflowTransportAction.java | 57 ++++++++++++------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 0dbec5bf2..e03a1b4d8 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -84,6 +84,16 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { context.restore(); + if (!response.isExists()) { + listener.onFailure( + new FlowFrameworkException( + "Failed to retrieve template (" + workflowId + ") from global context.", + RestStatus.NOT_FOUND + ) + ); + return; + } + // Parse template from document source Template template = Template.parseFromDocumentSource(response.getSourceAsString()); @@ -131,33 +141,38 @@ private void executeWorkflowAsync(String workflowId, Workflow workflow) { * @param workflowListener The listener that updates the status of a workflow execution */ private void executeWorkflow(Workflow workflow, ActionListener workflowListener) { - - List processSequence = workflowProcessSorter.sortProcessNodes(workflow); - List> workflowFutureList = new ArrayList<>(); - - for (ProcessNode processNode : processSequence) { - List predecessors = processNode.predecessors(); - - logger.info( - "Queueing process [{}].{}", - processNode.id(), - predecessors.isEmpty() - ? " Can start immediately!" - : String.format( - Locale.getDefault(), - " Must wait for [%s] to complete first.", - predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) - ) - ); - - workflowFutureList.add(processNode.execute()); - } try { + + // Attempt to topologically sort the workflow graph + List processSequence = workflowProcessSorter.sortProcessNodes(workflow); + List> workflowFutureList = new ArrayList<>(); + + for (ProcessNode processNode : processSequence) { + List predecessors = processNode.predecessors(); + + logger.info( + "Queueing process [{}].{}", + processNode.id(), + predecessors.isEmpty() + ? " Can start immediately!" + : String.format( + Locale.getDefault(), + " Must wait for [%s] to complete first.", + predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) + ) + ); + + workflowFutureList.add(processNode.execute()); + } + // Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally workflowFutureList.forEach(CompletableFuture::join); // TODO : Create State Index request with provisioning state, start time, end time, etc, pending implementation. String for now workflowListener.onResponse("READY"); + + } catch (IllegalArgumentException e) { + workflowListener.onFailure(new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST)); } catch (CancellationException | CompletionException ex) { workflowListener.onFailure(new FlowFrameworkException(ex.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); } From 7ca967b730de429e9a5c1c759d2781337c3ff09a Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Tue, 17 Oct 2023 18:10:36 +0000 Subject: [PATCH 2/2] bumping version to 2.12 Signed-off-by: Joshua Palis --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 980cb11d9..9a8d94b54 100644 --- a/build.gradle +++ b/build.gradle @@ -40,7 +40,7 @@ validateNebulaPom.enabled = false buildscript { ext { - opensearch_version = System.getProperty("opensearch.version", "2.11.0-SNAPSHOT") + opensearch_version = System.getProperty("opensearch.version", "2.12.0-SNAPSHOT") buildVersionQualifier = System.getProperty("build.version_qualifier", "") isSnapshot = "true" == System.getProperty("build.snapshot", "true") version_tokens = opensearch_version.tokenize('-')