Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Template and WorkflowState builders #778

Merged
merged 3 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Documentation
### Maintenance
### Refactoring
- Improve Template and WorkflowState builders ([#778](https://github.com/opensearch-project/flow-framework/pull/778))
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@
* @param listener action listener
*/
public void putInitialStateToWorkflowState(String workflowId, User user, ActionListener<IndexResponse> listener) {
WorkflowState state = new WorkflowState.Builder().workflowId(workflowId)
WorkflowState state = WorkflowState.builder()
.workflowId(workflowId)

Check warning on line 361 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L360-L361

Added lines #L360 - L361 were not covered by tests
.state(State.NOT_STARTED.name())
.provisioningProgress(ProvisioningProgress.NOT_STARTED.name())
.user(user)
Expand Down
24 changes: 20 additions & 4 deletions src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template.Builder;
import org.opensearch.flowframework.util.ParseUtils;

import java.io.IOException;
Expand Down Expand Up @@ -137,13 +136,13 @@ public static class Builder {
/**
* Empty Constructor for the Builder object
*/
public Builder() {}
private Builder() {}

/**
* Construct a Builder from an existing template
* @param t The existing template to copy
*/
public Builder(Template t) {
private Builder(Template t) {
this.name = t.name();
this.description = t.description();
this.useCase = t.useCase();
Expand Down Expand Up @@ -294,6 +293,23 @@ public Template build() {
}
}

/**
* Instantiate a new Template builder
* @return a new builder instance
*/
public static Builder builder() {
return new Builder();
}

/**
* Instantiate a new Template builder initialized from an existing template
* @param t The existing template to use as the source
* @return a new builder instance initialized from the existing template
*/
public static Builder builder(Template t) {
return new Builder(t);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
Expand Down Expand Up @@ -352,7 +368,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @return the updated template.
*/
public static Template updateExistingTemplate(Template existingTemplate, Template templateWithNewFields) {
Builder builder = new Template.Builder(existingTemplate).lastUpdatedTime(Instant.now());
Builder builder = Template.builder(existingTemplate).lastUpdatedTime(Instant.now());
if (templateWithNewFields.name() != null) {
builder.name(templateWithNewFields.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ public static Builder builder() {
return new Builder();
}

/**
* Constructs a builder object for workflowState from an existing state
* @param existingState a WorkflowState object to initialize the builder with
* @return Builder Object initialized with existing state
*/
public static Builder builder(WorkflowState existingState) {
return new Builder(existingState);
}

/**
* Class for constructing a Builder for WorkflowState
*/
Expand All @@ -143,7 +152,23 @@ public static class Builder {
/**
* Empty Constructor for the Builder object
*/
public Builder() {}
private Builder() {}

/**
* Builder from existing state
* @param existingState a WorkflowState object to initialize the builder with
*/
private Builder(WorkflowState existingState) {
this.workflowId = existingState.getWorkflowId();
this.error = existingState.getError();
this.state = existingState.getState();
this.provisioningProgress = existingState.getProvisioningProgress();
this.provisionStartTime = existingState.getProvisionStartTime();
this.provisionEndTime = existingState.getProvisionEndTime();
this.user = existingState.getUser();
this.userOutputs = existingState.userOutputs();
this.resourcesCreated = existingState.resourcesCreated();
}

/**
* Builder method for adding workflowID
Expand Down Expand Up @@ -254,6 +279,44 @@ public WorkflowState build() {
}
}

/**
* Merges two workflow states by updating the fields from an existing state with the (non-null) fields of another one.
* @param existingState An existing Workflow state.
* @param stateWithNewFields A workflow state containing only fields to update.
* @return the updated workflow state.
*/
public static WorkflowState updateExistingWorkflowState(WorkflowState existingState, WorkflowState stateWithNewFields) {
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
Builder builder = WorkflowState.builder(existingState);
if (stateWithNewFields.getWorkflowId() != null) {
builder.workflowId(stateWithNewFields.getWorkflowId());
}
if (stateWithNewFields.getError() != null) {
builder.error(stateWithNewFields.getError());
}
if (stateWithNewFields.getState() != null) {
builder.state(stateWithNewFields.getState());
}
if (stateWithNewFields.getProvisioningProgress() != null) {
builder.provisioningProgress(stateWithNewFields.getProvisioningProgress());
}
if (stateWithNewFields.getProvisionStartTime() != null) {
builder.provisionStartTime(stateWithNewFields.getProvisionStartTime());
}
if (stateWithNewFields.getProvisionEndTime() != null) {
builder.provisionEndTime(stateWithNewFields.getProvisionEndTime());
}
if (stateWithNewFields.getUser() != null) {
builder.user(stateWithNewFields.getUser());
}
if (stateWithNewFields.userOutputs() != null) {
builder.userOutputs(stateWithNewFields.userOutputs());
}
if (stateWithNewFields.resourcesCreated() != null) {
builder.resourcesCreated(stateWithNewFields.resourcesCreated());
}
return builder.build();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
Expand Down Expand Up @@ -492,7 +555,7 @@ public Map<String, Object> userOutputs() {
}

/**
* A map of all the resources created
* A list of all the resources created
* @return the resources created
*/
public List<ResourceCreated> resourcesCreated() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
Template existingTemplate = Template.parse(getResponse.getSourceAsString());
Template template = isFieldUpdate
? Template.updateExistingTemplate(existingTemplate, templateWithUser)
: new Template.Builder(templateWithUser).createdTime(existingTemplate.createdTime())
: Template.builder(templateWithUser)
.createdTime(existingTemplate.createdTime())
.lastUpdatedTime(Instant.now())
.lastProvisionedTime(existingTemplate.lastProvisionedTime())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public GetWorkflowStateResponse(WorkflowState workflowState, boolean allStatus)
if (allStatus) {
this.workflowState = workflowState;
} else {
this.workflowState = new WorkflowState.Builder().workflowId(workflowState.getWorkflowId())
this.workflowState = WorkflowState.builder()
.workflowId(workflowState.getWorkflowId())
.error(workflowState.getError())
.state(workflowState.getState())
.resourcesCreated(workflowState.resourcesCreated())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.PROVISIONING);
executeWorkflowAsync(workflowId, provisionProcessSequence, listener);
// update last provisioned field in template
Template newTemplate = new Template.Builder(template).lastProvisionedTime(Instant.now()).build();
Template newTemplate = Template.builder(template).lastProvisionedTime(Instant.now()).build();
flowFrameworkIndicesHandler.updateTemplateInGlobalContext(
request.getWorkflowId(),
newTemplate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private Template processTemplateCredentials(Template template, Function<String,
processedWorkflows.put(entry.getKey(), new Workflow(entry.getValue().userParams(), processedNodes, entry.getValue().edges()));
}

return new Template.Builder(template).workflows(processedWorkflows).build();
return Template.builder(template).workflows(processedWorkflows).build();
}

/**
Expand Down Expand Up @@ -240,10 +240,10 @@ public Template redactTemplateSecuredFields(User user, Template template) {
}

if (ParseUtils.isAdmin(user)) {
return new Template.Builder(template).workflows(processedWorkflows).build();
return Template.builder(template).workflows(processedWorkflows).build();
}

return new Template.Builder(template).user(null).workflows(processedWorkflows).build();
return Template.builder(template).user(null).workflows(processedWorkflows).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testUpdateExistingTemplate() {
now,
null
);
Template updated = new Template.Builder().name("name two").description("description two").useCase("use case two").build();
Template updated = Template.builder().name("name two").description("description two").useCase("use case two").build();
Template merged = Template.updateExistingTemplate(original, updated);
assertEquals("name two", merged.name());
assertEquals("description two", merged.description());
Expand All @@ -128,7 +128,8 @@ public void testUpdateExistingTemplate() {
assertEquals("1.1.1", merged.compatibilityVersion().get(1).toString());
assertEquals("one", merged.getUiMetadata().get("uiMetadata"));

updated = new Template.Builder().templateVersion(Version.fromString("2.2.2"))
updated = Template.builder()
.templateVersion(Version.fromString("2.2.2"))
.compatibilityVersion(List.of(Version.fromString("2.2.2"), Version.fromString("2.2.2")))
.uiMetadata(Map.of("uiMetadata", "two"))
.build();
Expand Down Expand Up @@ -164,4 +165,23 @@ public void testStrings() throws IOException {
assertTrue(t.toYaml().contains("a test template"));
assertTrue(t.toString().contains("a test template"));
}

public void testNullToEmptyString() throws IOException {
Template t = Template.parse("{\"name\":\"test\"}");
assertEquals("test", t.name());
assertEquals("", t.description());
assertEquals("", t.useCase());

XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.IGNORE_DEPRECATIONS,
"{\"name\":\"test\"}"
);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
t = Template.parse(parser, true);
String json = t.toJson();
assertTrue(json.contains("\"name\":\"test\""));
assertTrue(json.contains("\"description\":\"\""));
assertTrue(json.contains("\"use_case\":\"\""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,77 @@ public void testWorkflowState() throws IOException {
}
}

public void testWorkflowStateUpdate() {
// Time travel to guarantee update increments
Instant now = Instant.now().minusMillis(100);

WorkflowState wfs = WorkflowState.builder()
.workflowId("1")
.error("error one")
.state("state one")
.provisioningProgress("progress one")
.provisionStartTime(now)
.provisionEndTime(now)
.user(new User("one", Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))
.userOutputs(Map.of("output", "one"))
.resourcesCreated(List.of(new ResourceCreated("", "", "", "id one")))
.build();

assertEquals("1", wfs.getWorkflowId());
assertEquals("error one", wfs.getError());
assertEquals("state one", wfs.getState());
assertEquals("progress one", wfs.getProvisioningProgress());
assertEquals(now, wfs.getProvisionStartTime());
assertEquals(now, wfs.getProvisionEndTime());
assertEquals("one", wfs.getUser().getName());
assertEquals(1, wfs.userOutputs().size());
assertEquals("one", wfs.userOutputs().get("output"));
assertEquals(1, wfs.resourcesCreated().size());
ResourceCreated rc = wfs.resourcesCreated().get(0);
assertEquals("id one", rc.resourceId());

WorkflowState update = WorkflowState.builder()
.workflowId("2")
.error("error two")
.state("state two")
.provisioningProgress("progress two")
.user(new User("two", Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))
.build();

wfs = WorkflowState.updateExistingWorkflowState(wfs, update);
assertEquals("2", wfs.getWorkflowId());
assertEquals("error two", wfs.getError());
assertEquals("state two", wfs.getState());
assertEquals("progress two", wfs.getProvisioningProgress());
assertEquals(now, wfs.getProvisionStartTime());
assertEquals(now, wfs.getProvisionEndTime());
assertEquals("two", wfs.getUser().getName());
assertEquals(1, wfs.userOutputs().size());
assertEquals("one", wfs.userOutputs().get("output"));
assertEquals(1, wfs.resourcesCreated().size());
rc = wfs.resourcesCreated().get(0);
assertEquals("id one", rc.resourceId());

now = Instant.now().minusMillis(100);
update = WorkflowState.builder()
.provisionStartTime(now)
.provisionEndTime(now)
.userOutputs(Map.of("output", "two"))
.resourcesCreated(List.of(wfs.resourcesCreated().get(0), new ResourceCreated("", "", "", "id two")))
.build();

wfs = WorkflowState.updateExistingWorkflowState(wfs, update);
assertEquals("2", wfs.getWorkflowId());
assertEquals("error two", wfs.getError());
assertEquals("state two", wfs.getState());
assertEquals("progress two", wfs.getProvisioningProgress());
assertEquals(now, wfs.getProvisionStartTime());
assertEquals(now, wfs.getProvisionEndTime());
assertEquals("two", wfs.getUser().getName());
assertEquals(1, wfs.userOutputs().size());
assertEquals("two", wfs.userOutputs().get("output"));
assertEquals(2, wfs.resourcesCreated().size());
rc = wfs.resourcesCreated().get(1);
assertEquals("id two", rc.resourceId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception {
)
.collect(Collectors.toList());
Workflow missingInputs = new Workflow(originalWorkflow.userParams(), modifiednodes, originalWorkflow.edges());
Template templateWithMissingInputs = new Template.Builder(template).workflows(Map.of(PROVISION_WORKFLOW, missingInputs)).build();
Template templateWithMissingInputs = Template.builder(template).workflows(Map.of(PROVISION_WORKFLOW, missingInputs)).build();

// Hit Create Workflow API with invalid template
Response response = createWorkflow(client(), templateWithMissingInputs);
Expand Down Expand Up @@ -240,7 +240,7 @@ public void testCreateAndProvisionCyclicalTemplate() throws Exception {
List.of(new WorkflowEdge("workflow_step_2", "workflow_step_3"), new WorkflowEdge("workflow_step_3", "workflow_step_2"))
);

Template cyclicalTemplate = new Template.Builder(template).workflows(Map.of(PROVISION_WORKFLOW, cyclicalWorkflow)).build();
Template cyclicalTemplate = Template.builder(template).workflows(Map.of(PROVISION_WORKFLOW, cyclicalWorkflow)).build();

// Hit dry run
ResponseException exception = expectThrows(ResponseException.class, () -> createWorkflowValidation(client(), cyclicalTemplate));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public void testUpdateWorkflow() {
ActionListener<GetResponse> getListener = invocation.getArgument(1);
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
when(getResponse.getSourceAsString()).thenReturn(new Template.Builder().name("test").build().toJson());
when(getResponse.getSourceAsString()).thenReturn(Template.builder().name("test").build().toJson());
getListener.onResponse(getResponse);
return null;
}).when(client).get(any(GetRequest.class), any());
Expand Down Expand Up @@ -425,7 +425,7 @@ public void testUpdateWorkflowWithField() {
ActionListener<WorkflowResponse> listener = mock(ActionListener.class);
WorkflowRequest updateWorkflow = new WorkflowRequest(
"1",
new Template.Builder().name("new name").description("test").useCase(null).uiMetadata(Map.of("foo", "bar")).build(),
Template.builder().name("new name").description("test").useCase(null).uiMetadata(Map.of("foo", "bar")).build(),
Map.of(UPDATE_WORKFLOW_FIELDS, "true")
);

Expand Down Expand Up @@ -463,7 +463,8 @@ public void testUpdateWorkflowWithField() {

updateWorkflow = new WorkflowRequest(
"1",
new Template.Builder().useCase("foo")
Template.builder()
.useCase("foo")
.templateVersion(Version.CURRENT)
.compatibilityVersion(List.of(Version.V_2_0_0, Version.CURRENT))
.build(),
Expand Down
Loading