From 7a7d0b165e4c8df8a861527be9ba520eb813a3bf Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 12 Jul 2024 08:41:49 -0700 Subject: [PATCH] Add ability to more easily update WorkflowState Signed-off-by: Daniel Widdis --- .../indices/FlowFrameworkIndicesHandler.java | 3 +- .../flowframework/model/WorkflowState.java | 67 ++++++++++++++++- .../transport/GetWorkflowStateResponse.java | 3 +- .../model/WorkflowStateTests.java | 73 +++++++++++++++++++ 4 files changed, 142 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 8fcfd1207..18f0a9780 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -357,7 +357,8 @@ public void initializeConfigIndex(ActionListener listener) { * @param listener action listener */ public void putInitialStateToWorkflowState(String workflowId, User user, ActionListener listener) { - WorkflowState state = new WorkflowState.Builder().workflowId(workflowId) + WorkflowState state = WorkflowState.builder() + .workflowId(workflowId) .state(State.NOT_STARTED.name()) .provisioningProgress(ProvisioningProgress.NOT_STARTED.name()) .user(user) diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowState.java b/src/main/java/org/opensearch/flowframework/model/WorkflowState.java index 6b1593b6e..6a4b81a55 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowState.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowState.java @@ -126,6 +126,15 @@ public static Builder builder() { return new Builder(); } + /** + * Constructs a builder object for workflowState from an existing state + * @param existingState a WorkflowState object to initialize the builder with + * @return Builder Object initialized with existing state + */ + public static Builder builder(WorkflowState existingState) { + return new Builder(existingState); + } + /** * Class for constructing a Builder for WorkflowState */ @@ -143,7 +152,23 @@ public static class Builder { /** * Empty Constructor for the Builder object */ - public Builder() {} + private Builder() {} + + /** + * Builder from existing state + * @param existingState a WorkflowState object to initialize the builder with + */ + private Builder(WorkflowState existingState) { + this.workflowId = existingState.getWorkflowId(); + this.error = existingState.getError(); + this.state = existingState.getState(); + this.provisioningProgress = existingState.getProvisioningProgress(); + this.provisionStartTime = existingState.getProvisionStartTime(); + this.provisionEndTime = existingState.getProvisionEndTime(); + this.user = existingState.getUser(); + this.userOutputs = existingState.userOutputs(); + this.resourcesCreated = existingState.resourcesCreated(); + } /** * Builder method for adding workflowID @@ -254,6 +279,44 @@ public WorkflowState build() { } } + /** + * Merges two workflow states by updating the fields from an existing state with the (non-null) fields of another one. + * @param existingState An existing Workflow state. + * @param stateWithNewFields A workflow state containing only fields to update. + * @return the updated workflow state. + */ + public static WorkflowState updateExistingWorkflowState(WorkflowState existingState, WorkflowState stateWithNewFields) { + Builder builder = WorkflowState.builder(existingState); + if (stateWithNewFields.getWorkflowId() != null) { + builder.workflowId(stateWithNewFields.getWorkflowId()); + } + if (stateWithNewFields.getError() != null) { + builder.error(stateWithNewFields.getError()); + } + if (stateWithNewFields.getState() != null) { + builder.state(stateWithNewFields.getState()); + } + if (stateWithNewFields.getProvisioningProgress() != null) { + builder.provisioningProgress(stateWithNewFields.getProvisioningProgress()); + } + if (stateWithNewFields.getProvisionStartTime() != null) { + builder.provisionStartTime(stateWithNewFields.getProvisionStartTime()); + } + if (stateWithNewFields.getProvisionEndTime() != null) { + builder.provisionEndTime(stateWithNewFields.getProvisionEndTime()); + } + if (stateWithNewFields.getUser() != null) { + builder.user(stateWithNewFields.getUser()); + } + if (stateWithNewFields.userOutputs() != null) { + builder.userOutputs(stateWithNewFields.userOutputs()); + } + if (stateWithNewFields.resourcesCreated() != null) { + builder.resourcesCreated(stateWithNewFields.resourcesCreated()); + } + return builder.build(); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { XContentBuilder xContentBuilder = builder.startObject(); @@ -492,7 +555,7 @@ public Map userOutputs() { } /** - * A map of all the resources created + * A list of all the resources created * @return the resources created */ public List resourcesCreated() { diff --git a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java index 6f8b9e14b..517126c2b 100644 --- a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java +++ b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java @@ -48,7 +48,8 @@ public GetWorkflowStateResponse(WorkflowState workflowState, boolean allStatus) if (allStatus) { this.workflowState = workflowState; } else { - this.workflowState = new WorkflowState.Builder().workflowId(workflowState.getWorkflowId()) + this.workflowState = WorkflowState.builder() + .workflowId(workflowState.getWorkflowId()) .error(workflowState.getError()) .state(workflowState.getState()) .resourcesCreated(workflowState.resourcesCreated()) diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java index 04c3655d3..ca7873036 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java @@ -90,4 +90,77 @@ public void testWorkflowState() throws IOException { } } + public void testWorkflowStateUpdate() { + // Time travel to guarantee update increments + Instant now = Instant.now().minusMillis(100); + + WorkflowState wfs = WorkflowState.builder() + .workflowId("1") + .error("error one") + .state("state one") + .provisioningProgress("progress one") + .provisionStartTime(now) + .provisionEndTime(now) + .user(new User("one", Collections.emptyList(), Collections.emptyList(), Collections.emptyList())) + .userOutputs(Map.of("output", "one")) + .resourcesCreated(List.of(new ResourceCreated("", "", "", "id one"))) + .build(); + + assertEquals("1", wfs.getWorkflowId()); + assertEquals("error one", wfs.getError()); + assertEquals("state one", wfs.getState()); + assertEquals("progress one", wfs.getProvisioningProgress()); + assertEquals(now, wfs.getProvisionStartTime()); + assertEquals(now, wfs.getProvisionEndTime()); + assertEquals("one", wfs.getUser().getName()); + assertEquals(1, wfs.userOutputs().size()); + assertEquals("one", wfs.userOutputs().get("output")); + assertEquals(1, wfs.resourcesCreated().size()); + ResourceCreated rc = wfs.resourcesCreated().get(0); + assertEquals("id one", rc.resourceId()); + + WorkflowState update = WorkflowState.builder() + .workflowId("2") + .error("error two") + .state("state two") + .provisioningProgress("progress two") + .user(new User("two", Collections.emptyList(), Collections.emptyList(), Collections.emptyList())) + .build(); + + wfs = WorkflowState.updateExistingWorkflowState(wfs, update); + assertEquals("2", wfs.getWorkflowId()); + assertEquals("error two", wfs.getError()); + assertEquals("state two", wfs.getState()); + assertEquals("progress two", wfs.getProvisioningProgress()); + assertEquals(now, wfs.getProvisionStartTime()); + assertEquals(now, wfs.getProvisionEndTime()); + assertEquals("two", wfs.getUser().getName()); + assertEquals(1, wfs.userOutputs().size()); + assertEquals("one", wfs.userOutputs().get("output")); + assertEquals(1, wfs.resourcesCreated().size()); + rc = wfs.resourcesCreated().get(0); + assertEquals("id one", rc.resourceId()); + + now = Instant.now().minusMillis(100); + update = WorkflowState.builder() + .provisionStartTime(now) + .provisionEndTime(now) + .userOutputs(Map.of("output", "two")) + .resourcesCreated(List.of(wfs.resourcesCreated().get(0), new ResourceCreated("", "", "", "id two"))) + .build(); + + wfs = WorkflowState.updateExistingWorkflowState(wfs, update); + assertEquals("2", wfs.getWorkflowId()); + assertEquals("error two", wfs.getError()); + assertEquals("state two", wfs.getState()); + assertEquals("progress two", wfs.getProvisioningProgress()); + assertEquals(now, wfs.getProvisionStartTime()); + assertEquals(now, wfs.getProvisionEndTime()); + assertEquals("two", wfs.getUser().getName()); + assertEquals(1, wfs.userOutputs().size()); + assertEquals("two", wfs.userOutputs().get("output")); + assertEquals(2, wfs.resourcesCreated().size()); + rc = wfs.resourcesCreated().get(1); + assertEquals("id two", rc.resourceId()); + } }