diff --git a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java index 3184c9efd..2244fe01f 100644 --- a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java +++ b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java @@ -37,6 +37,9 @@ public enum WorkflowResources { private final String workflowStep; private final String resourceCreated; private static final Logger logger = LogManager.getLogger(WorkflowResources.class); + private static final Set allResources = Stream.of(values()) + .map(WorkflowResources::getResourceCreated) + .collect(Collectors.toSet()); WorkflowResources(String workflowStep, String resourceCreated) { this.workflowStep = workflowStep; @@ -82,6 +85,6 @@ public static String getResourceByWorkflowStep(String workflowStep) throws IOExc * @return a set of all the resource created types */ public static Set getAllResourcesCreated() { - return Stream.of(values()).map(WorkflowResources::getResourceCreated).collect(Collectors.toSet()); + return allResources; } } diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index eb7571590..0f429ba20 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -41,7 +41,6 @@ import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.model.WorkflowState; import org.opensearch.flowframework.util.EncryptorUtils; -import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.script.Script; import org.opensearch.script.ScriptType; @@ -50,7 +49,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; @@ -492,7 +490,7 @@ public void updateFlowFrameworkSystemIndexDocWithScript( * @param nodeId WorkflowData object with relevent step information * @param workflowStepName the workflowstep name that created the resource * @param resourceId the id of the newly created resource - * @param completableFuture the CompletableFuture used for this step + * @param listener the ActionListener for this step to handle completing the future after update * @throws IOException if parsing fails on new resource */ public void updateResourceInStateIndex( @@ -500,7 +498,7 @@ public void updateResourceInStateIndex( String nodeId, String workflowStepName, String resourceId, - CompletableFuture completableFuture + ActionListener listener ) throws IOException { ResourceCreated newResource = new ResourceCreated(workflowStepName, nodeId, resourceId); XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent()); @@ -516,10 +514,7 @@ public void updateResourceInStateIndex( updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> { logger.info("updated resources created of {}", workflowId); - }, exception -> { - completableFuture.completeExceptionally(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); - logger.error("Failed to update workflow state with newly created resource", exception); - })); - + listener.onResponse(updateResponse); + }, exception -> { listener.onFailure(exception); })); } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java index 29ab43568..dc4c83d4e 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateConnectorStep.java @@ -84,20 +84,27 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) { try { String resourceName = WorkflowResources.getResourceByWorkflowStep(getName()); - createConnectorFuture.complete( - new WorkflowData( - Map.ofEntries(Map.entry(resourceName, mlCreateConnectorResponse.getConnectorId())), - currentNodeInputs.getWorkflowId(), - currentNodeId - ) - ); logger.info("Created connector successfully"); flowFrameworkIndicesHandler.updateResourceInStateIndex( currentNodeInputs.getWorkflowId(), currentNodeId, getName(), mlCreateConnectorResponse.getConnectorId(), - createConnectorFuture + ActionListener.wrap(response -> { + logger.info("successfully updated resources created in state index: {}", response.getIndex()); + createConnectorFuture.complete( + new WorkflowData( + Map.ofEntries(Map.entry(resourceName, mlCreateConnectorResponse.getConnectorId())), + currentNodeInputs.getWorkflowId(), + currentNodeId + ) + ); + }, exception -> { + logger.error("Failed to update new created resource", exception); + createConnectorFuture.completeExceptionally( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + }) ); } catch (Exception e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java index 3c2badb42..0ace57dc3 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java @@ -73,19 +73,26 @@ public void onResponse(CreateIndexResponse createIndexResponse) { try { String resourceName = WorkflowResources.getResourceByWorkflowStep(getName()); logger.info("created index: {}", createIndexResponse.index()); - createIndexFuture.complete( - new WorkflowData( - Map.of(resourceName, createIndexResponse.index()), - currentNodeInputs.getWorkflowId(), - currentNodeId - ) - ); flowFrameworkIndicesHandler.updateResourceInStateIndex( currentNodeInputs.getWorkflowId(), currentNodeId, getName(), createIndexResponse.index(), - createIndexFuture + ActionListener.wrap(response -> { + logger.info("successfully updated resource created in state index: {}", response.getIndex()); + createIndexFuture.complete( + new WorkflowData( + Map.of(resourceName, createIndexResponse.index()), + currentNodeInputs.getWorkflowId(), + currentNodeId + ) + ); + }, exception -> { + logger.error("Failed to update new created resource", exception); + createIndexFuture.completeExceptionally( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + }) ); } catch (Exception e) { logger.error("Failed to parse and update new created resource", e); diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java index f5d9b6a17..352772a49 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java @@ -144,21 +144,28 @@ public CompletableFuture execute( try { String resourceName = WorkflowResources.getResourceByWorkflowStep(getName()); - // PutPipelineRequest returns only an AcknowledgeResponse, returning pipelineId instead - // TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here - createIngestPipelineFuture.complete( - new WorkflowData( - Map.of(resourceName, putPipelineRequest.getId()), - currentNodeInputs.getWorkflowId(), - currentNodeInputs.getNodeId() - ) - ); flowFrameworkIndicesHandler.updateResourceInStateIndex( currentNodeInputs.getWorkflowId(), currentNodeId, getName(), putPipelineRequest.getId(), - createIngestPipelineFuture + ActionListener.wrap(updateResponse -> { + logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); + // PutPipelineRequest returns only an AcknowledgeResponse, returning pipelineId instead + // TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here + createIngestPipelineFuture.complete( + new WorkflowData( + Map.of(resourceName, putPipelineRequest.getId()), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + }, exception -> { + logger.error("Failed to update new created resource", exception); + createIngestPipelineFuture.completeExceptionally( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + }) ); } catch (Exception e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/ModelGroupStep.java b/src/main/java/org/opensearch/flowframework/workflow/ModelGroupStep.java index e32f2bb71..50ae30986 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ModelGroupStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ModelGroupStep.java @@ -75,22 +75,29 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse try { logger.info("Remote Model registration successful"); String resourceName = WorkflowResources.getResourceByWorkflowStep(getName()); - registerModelGroupFuture.complete( - new WorkflowData( - Map.ofEntries( - Map.entry(resourceName, mlRegisterModelGroupResponse.getModelGroupId()), - Map.entry(MODEL_GROUP_STATUS, mlRegisterModelGroupResponse.getStatus()) - ), - currentNodeInputs.getWorkflowId(), - currentNodeId - ) - ); flowFrameworkIndicesHandler.updateResourceInStateIndex( currentNodeInputs.getWorkflowId(), currentNodeId, getName(), mlRegisterModelGroupResponse.getModelGroupId(), - registerModelGroupFuture + ActionListener.wrap(updateResponse -> { + logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); + registerModelGroupFuture.complete( + new WorkflowData( + Map.ofEntries( + Map.entry(resourceName, mlRegisterModelGroupResponse.getModelGroupId()), + Map.entry(MODEL_GROUP_STATUS, mlRegisterModelGroupResponse.getStatus()) + ), + currentNodeInputs.getWorkflowId(), + currentNodeId + ) + ); + }, exception -> { + logger.error("Failed to update new created resource", exception); + registerModelGroupFuture.completeExceptionally( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + }) ); } catch (Exception e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterLocalModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterLocalModelStep.java index e7ac4fb5b..3dc730b54 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterLocalModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterLocalModelStep.java @@ -255,22 +255,29 @@ void retryableGetMlTask( try { logger.info("Local Model registration successful"); String resourceName = WorkflowResources.getResourceByWorkflowStep(getName()); - registerLocalModelFuture.complete( - new WorkflowData( - Map.ofEntries( - Map.entry(resourceName, response.getModelId()), - Map.entry(REGISTER_MODEL_STATUS, response.getState().name()) - ), - workflowId, - nodeId - ) - ); flowFrameworkIndicesHandler.updateResourceInStateIndex( workflowId, nodeId, getName(), response.getTaskId(), - registerLocalModelFuture + ActionListener.wrap(updateResponse -> { + logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); + registerLocalModelFuture.complete( + new WorkflowData( + Map.ofEntries( + Map.entry(resourceName, response.getModelId()), + Map.entry(REGISTER_MODEL_STATUS, response.getState().name()) + ), + workflowId, + nodeId + ) + ); + }, exception -> { + logger.error("Failed to update new created resource", exception); + registerLocalModelFuture.completeExceptionally( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + }) ); } catch (Exception e) { diff --git a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java index b43e859e1..7e33937bc 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/RegisterRemoteModelStep.java @@ -77,22 +77,29 @@ public void onResponse(MLRegisterModelResponse mlRegisterModelResponse) { try { logger.info("Remote Model registration successful"); String resourceName = WorkflowResources.getResourceByWorkflowStep(getName()); - registerRemoteModelFuture.complete( - new WorkflowData( - Map.ofEntries( - Map.entry(resourceName, mlRegisterModelResponse.getModelId()), - Map.entry(REGISTER_MODEL_STATUS, mlRegisterModelResponse.getStatus()) - ), - currentNodeInputs.getWorkflowId(), - currentNodeInputs.getNodeId() - ) - ); flowFrameworkIndicesHandler.updateResourceInStateIndex( currentNodeInputs.getWorkflowId(), currentNodeId, getName(), mlRegisterModelResponse.getModelId(), - registerRemoteModelFuture + ActionListener.wrap(response -> { + logger.info("successfully updated resources created in state index: {}", response.getIndex()); + registerRemoteModelFuture.complete( + new WorkflowData( + Map.ofEntries( + Map.entry(resourceName, mlRegisterModelResponse.getModelId()), + Map.entry(REGISTER_MODEL_STATUS, mlRegisterModelResponse.getStatus()) + ), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + }, exception -> { + logger.error("Failed to update new created resource", exception); + registerRemoteModelFuture.completeExceptionally( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + }) ); } catch (Exception e) {