Skip to content

Commit

Permalink
Support editing of certain workflow fields on a provisioned workflow
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jul 4, 2024
1 parent cf1016f commit 5c466f7
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 72 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
- Support editing of certain workflow fields on a provisioned workflow ([#757](https://github.com/opensearch-project/flow-framework/pull/757))

### Enhancements
- Register system index descriptors through SystemIndexPlugin.getSystemIndexDescriptors ([#750](https://github.com/opensearch-project/flow-framework/pull/750))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ private CommonValue() {}
public static final String WORKFLOW_ID = "workflow_id";
/** Field name for template validation, the flag to indicate if validation is necessary */
public static final String VALIDATION = "validation";
/** The field name for provision workflow within a use case template*/
/** The param name for provision workflow in create API */
public static final String PROVISION_WORKFLOW = "provision";
/** The param name for update workflow field in create API */
public static final String UPDATE_WORKFLOW_FIELDS = "update_fields";
/** The field name for workflow steps. This field represents the name of the workflow steps to be fetched. */
public static final String WORKFLOW_STEP = "workflow_step";
/** The param name for default use case, used by the create workflow API */
Expand Down
47 changes: 41 additions & 6 deletions src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CREATED_TIME;
Expand All @@ -53,6 +54,14 @@ public class Template implements ToXContentObject {
public static final String TEMPLATE_FIELD = "template";
/** The template field name for template use case */
public static final String USE_CASE_FIELD = "use_case";
/** Fields which may be updated in the template even if provisioned */
private static final Set<String> UPDATE_FIELD_ALLOWLIST = Set.of(
NAME_FIELD,
DESCRIPTION_FIELD,
USE_CASE_FIELD,
VERSION_FIELD,
UI_METADATA_FIELD
);

private final String name;
private final String description;
Expand Down Expand Up @@ -287,8 +296,8 @@ public Template build() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field(NAME_FIELD, this.name);
xContentBuilder.field(DESCRIPTION_FIELD, this.description);
xContentBuilder.field(USE_CASE_FIELD, this.useCase);
xContentBuilder.field(DESCRIPTION_FIELD, this.description == null ? "" : this.description);
xContentBuilder.field(USE_CASE_FIELD, this.useCase == null ? "" : this.useCase);

if (this.templateVersion != null || !this.compatibilityVersion.isEmpty()) {
xContentBuilder.startObject(VERSION_FIELD);
Expand Down Expand Up @@ -342,9 +351,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
* @throws IOException if content can't be parsed correctly
*/
public static Template parse(XContentParser parser) throws IOException {
return parse(parser, false);
}

/**
* Parse raw xContent into a Template instance.
*
* @param parser xContent based content parser
* @param fieldUpdate if set true, will be used for updating an existing template
* @return an instance of the template
* @throws IOException if content can't be parsed correctly
*/
public static Template parse(XContentParser parser, boolean fieldUpdate) throws IOException {
String name = null;
String description = "";
String useCase = "";
String description = null;
String useCase = null;
Version templateVersion = null;
List<Version> compatibilityVersion = new ArrayList<>();
Map<String, Workflow> workflows = new HashMap<>();
Expand All @@ -357,6 +378,12 @@ public static Template parse(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
if (fieldUpdate && !UPDATE_FIELD_ALLOWLIST.contains(fieldName)) {
throw new FlowFrameworkException(
"You can not update the field [" + fieldName + "] without updating the whole template.",
RestStatus.BAD_REQUEST
);
}
parser.nextToken();
switch (fieldName) {
case NAME_FIELD:
Expand Down Expand Up @@ -421,8 +448,16 @@ public static Template parse(XContentParser parser) throws IOException {
);
}
}
if (name == null) {
throw new FlowFrameworkException("A template object requires a name.", RestStatus.BAD_REQUEST);
if (!fieldUpdate) {
if (name == null) {
throw new FlowFrameworkException("A template object requires a name.", RestStatus.BAD_REQUEST);
}
if (description == null) {
description = "";
}
if (useCase == null) {
useCase = "";
}
}

return new Builder().name(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS;
import static org.opensearch.flowframework.common.CommonValue.USE_CASE;
import static org.opensearch.flowframework.common.CommonValue.VALIDATION;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
Expand Down Expand Up @@ -83,6 +84,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String workflowId = request.param(WORKFLOW_ID);
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);
boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false);
String useCase = request.param(USE_CASE);
// If provisioning, consume all other params and pass to provision transport action
Map<String, String> params = provision
Expand Down Expand Up @@ -117,11 +119,23 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
if (provision && updateFields) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use both the " + PROVISION_WORKFLOW + " and " + UPDATE_WORKFLOW_FIELDS + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
try {

Template template;
Map<String, String> useCaseDefaultsMap = Collections.emptyMap();
if (useCase != null) {
// Reconstruct the template from a substitution-ready use case
String useCaseTemplateFileInStringFormat = ParseUtils.resourceToString(
"/" + DefaultUseCases.getSubstitutionReadyFileByUseCaseName(useCase)
);
Expand Down Expand Up @@ -178,21 +192,25 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
null,
useCaseDefaultsMap
);
XContentParser parserTestJson = ParseUtils.jsonToParser(useCaseTemplateFileInStringFormat);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parserTestJson.currentToken(), parserTestJson);
template = Template.parse(parserTestJson);

XContentParser useCaseParser = ParseUtils.jsonToParser(useCaseTemplateFileInStringFormat);
ensureExpectedToken(XContentParser.Token.START_OBJECT, useCaseParser.currentToken(), useCaseParser);
template = Template.parse(useCaseParser);
} else {
XContentParser parser = request.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
template = Template.parse(parser);
template = Template.parse(parser, updateFields);
}

// If not provisioning, params map is empty. Use it to pass updateFields flag to WorkflowRequest
if (updateFields) {
params = Map.of(UPDATE_WORKFLOW_FIELDS, "true");
}

WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
template,
validation,
provision,
provision || updateFields,
params,
useCase,
useCaseDefaultsMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Template.Builder;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
Expand Down Expand Up @@ -233,43 +235,80 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
);
} else {
// This is an existing workflow (PUT)
final boolean isFieldUpdate = request.isUpdateFields();
// Fetch existing entry for time stamps
logger.info("Querying existing workflow from global context: {}", workflowId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId), ActionListener.wrap(getResponse -> {
context.restore();
if (getResponse.isExists()) {
Template existingTemplate = Template.parse(getResponse.getSourceAsString());
// Update existing entry, full document replacement
Template template = new Template.Builder(templateWithUser).createdTime(existingTemplate.createdTime())
.lastUpdatedTime(Instant.now())
.lastProvisionedTime(existingTemplate.lastProvisionedTime())
.build();
Template template;
if (isFieldUpdate) {
// Update only specified fields in existing
Builder builder = new Template.Builder(existingTemplate).lastUpdatedTime(Instant.now());
if (templateWithUser.name() != null) {
builder.name(templateWithUser.name());
}
if (!Strings.isBlank(templateWithUser.description())) {
builder.description(templateWithUser.description());
}
if (!Strings.isBlank(templateWithUser.useCase())) {
builder.useCase(templateWithUser.useCase());
}
if (templateWithUser.templateVersion() != null) {
builder.templateVersion(templateWithUser.templateVersion());
}
if (!templateWithUser.compatibilityVersion().isEmpty()) {
builder.compatibilityVersion(templateWithUser.compatibilityVersion());
}
if (templateWithUser.getUiMetadata() != null) {
builder.uiMetadata(templateWithUser.getUiMetadata());
}
template = builder.build();
} else {
// Update existing entry, full document replacement
template = new Template.Builder(templateWithUser).createdTime(existingTemplate.createdTime())
.lastUpdatedTime(Instant.now())
.lastProvisionedTime(existingTemplate.lastProvisionedTime())
.build();
}
flowFrameworkIndicesHandler.updateTemplateInGlobalContext(
request.getWorkflowId(),
template,
ActionListener.wrap(response -> {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
Map.ofEntries(
Map.entry(STATE_FIELD, State.NOT_STARTED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED)
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name());
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
String errorMessage = "Failed to update workflow " + request.getWorkflowId() + " in template index";
logger.error(errorMessage, exception);
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
// Ignore state index if updating fields
if (!isFieldUpdate) {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
Map.ofEntries(
Map.entry(STATE_FIELD, State.NOT_STARTED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED)
),
ActionListener.wrap(updateResponse -> {
logger.info(
"updated workflow {} state to {}",
request.getWorkflowId(),
State.NOT_STARTED.name()
);
}
})
);
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
String errorMessage = "Failed to update workflow "
+ request.getWorkflowId()
+ " in template index";
logger.error(errorMessage, exception);
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
);
}
})
);
} else {
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}
}, exception -> {
String errorMessage = "Failed to update use case template " + request.getWorkflowId();
logger.error(errorMessage, exception);
Expand All @@ -278,7 +317,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
} else {
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
}
})
}),
isFieldUpdate
);
} else {
String errorMessage = "Failed to retrieve template (" + workflowId + ") from global context.";
Expand Down
Loading

0 comments on commit 5c466f7

Please sign in to comment.