From 987b96ff5190d3f0c698968aa2be15131a58afee Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Fri, 9 Feb 2024 17:00:50 -0800 Subject: [PATCH] [Backport 2.12] Improve provisioning exceptions with step ID and hide exception details (#519) Improve provisioning exceptions with step ID and hide exception details (#515) (cherry picked from commit 861616ac07d0df94d455609530f0e03a078d8fdd) Signed-off-by: Daniel Widdis Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] Co-authored-by: Joshua Palis --- .../indices/FlowFrameworkIndicesHandler.java | 5 ++-- .../ProvisionWorkflowTransportAction.java | 29 +++++++++---------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 1bc6d75c5..76c543883 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -522,11 +522,10 @@ public void updateFlowFrameworkSystemIndexDoc( } else { try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId); - Map updatedContent = new HashMap<>(); - updatedContent.putAll(updatedFields); + Map updatedContent = new HashMap<>(updatedFields); updateRequest.doc(updatedContent); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - updateRequest.retryOnConflict(3); + updateRequest.retryOnConflict(5); // TODO: decide what condition can be considered as an update conflict and add retry strategy client.update(updateRequest, ActionListener.runBefore(listener, context::restore)); } catch (Exception e) { diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 2ed203c63..efa4b8e6b 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -35,12 +35,11 @@ import org.opensearch.transport.TransportService; import java.time.Instant; -import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.CancellationException; import java.util.stream.Collectors; import static org.opensearch.flowframework.common.CommonValue.ERROR_FIELD; @@ -192,9 +191,9 @@ private void executeWorkflowAsync(String workflowId, List workflowS * @param workflowId The workflowId associated with the workflow that is executing */ private void executeWorkflow(List workflowSequence, String workflowId) { + String currentStepId = ""; try { - - List> workflowFutureList = new ArrayList<>(); + Map> workflowFutureMap = new LinkedHashMap<>(); for (ProcessNode processNode : workflowSequence) { List predecessors = processNode.predecessors(); @@ -210,11 +209,14 @@ private void executeWorkflow(List workflowSequence, String workflow ) ); - workflowFutureList.add(processNode.execute()); + workflowFutureMap.put(processNode.id(), processNode.execute()); } - // Attempt to join each workflow step future, may throw a ExecutionException if any step completes exceptionally - workflowFutureList.forEach(PlainActionFuture::actionGet); + // Attempt to complete each workflow step future, may throw a ExecutionException if any step completes exceptionally + for (Map.Entry> e : workflowFutureMap.entrySet()) { + currentStepId = e.getKey(); + e.getValue().actionGet(); + } logger.info("Provisioning completed successfully for workflow {}", workflowId); flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( @@ -229,15 +231,10 @@ private void executeWorkflow(List workflowSequence, String workflow }, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage(), exception); }) ); } catch (Exception ex) { - logger.error("Provisioning failed for workflow: {}", workflowId, ex); - String errorMessage; - if (ex instanceof CancellationException) { - errorMessage = "A step in the workflow was cancelled."; - } else if (ex.getCause() != null) { - errorMessage = ex.getCause().getMessage(); - } else { - errorMessage = ex.getMessage(); - } + logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex); + String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName()) + + " during step " + + currentStepId; flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( workflowId, Map.ofEntries(