Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support editing of certain workflow fields on a provisioned workflow #757

Merged
merged 5 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test_security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ on:
pull_request:
types: [opened, synchronize, reopened]

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
Get-CI-Image-Tag:
uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
- Support editing of certain workflow fields on a provisioned workflow ([#757](https://github.com/opensearch-project/flow-framework/pull/757))

### Enhancements
- Register system index descriptors through SystemIndexPlugin.getSystemIndexDescriptors ([#750](https://github.com/opensearch-project/flow-framework/pull/750))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ private CommonValue() {}
public static final String WORKFLOW_ID = "workflow_id";
/** Field name for template validation, the flag to indicate if validation is necessary */
public static final String VALIDATION = "validation";
/** The field name for provision workflow within a use case template*/
/** The param name for provision workflow in create API */
public static final String PROVISION_WORKFLOW = "provision";
/** The param name for update workflow field in create API */
public static final String UPDATE_WORKFLOW_FIELDS = "update_fields";
/** The field name for workflow steps. This field represents the name of the workflow steps to be fetched. */
public static final String WORKFLOW_STEP = "workflow_step";
/** The param name for default use case, used by the create workflow API */
Expand Down
47 changes: 41 additions & 6 deletions src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CREATED_TIME;
Expand All @@ -53,6 +54,14 @@
public static final String TEMPLATE_FIELD = "template";
/** The template field name for template use case */
public static final String USE_CASE_FIELD = "use_case";
/** Fields which may be updated in the template even if provisioned */
private static final Set<String> UPDATE_FIELD_ALLOWLIST = Set.of(
NAME_FIELD,
DESCRIPTION_FIELD,
USE_CASE_FIELD,
VERSION_FIELD,
UI_METADATA_FIELD
);
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved

private final String name;
private final String description;
Expand Down Expand Up @@ -287,8 +296,8 @@
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field(NAME_FIELD, this.name);
xContentBuilder.field(DESCRIPTION_FIELD, this.description);
xContentBuilder.field(USE_CASE_FIELD, this.useCase);
xContentBuilder.field(DESCRIPTION_FIELD, this.description == null ? "" : this.description);
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
xContentBuilder.field(USE_CASE_FIELD, this.useCase == null ? "" : this.useCase);
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved

if (this.templateVersion != null || !this.compatibilityVersion.isEmpty()) {
xContentBuilder.startObject(VERSION_FIELD);
Expand Down Expand Up @@ -342,9 +351,21 @@
* @throws IOException if content can't be parsed correctly
*/
public static Template parse(XContentParser parser) throws IOException {
return parse(parser, false);
}

/**
* Parse raw xContent into a Template instance.
*
* @param parser xContent based content parser
* @param fieldUpdate if set true, will be used for updating an existing template
* @return an instance of the template
* @throws IOException if content can't be parsed correctly
*/
public static Template parse(XContentParser parser, boolean fieldUpdate) throws IOException {
String name = null;
String description = "";
String useCase = "";
String description = null;
String useCase = null;
Version templateVersion = null;
List<Version> compatibilityVersion = new ArrayList<>();
Map<String, Workflow> workflows = new HashMap<>();
Expand All @@ -357,6 +378,12 @@
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
if (fieldUpdate && !UPDATE_FIELD_ALLOWLIST.contains(fieldName)) {
throw new FlowFrameworkException(
"You can not update the field [" + fieldName + "] without updating the whole template.",
RestStatus.BAD_REQUEST
);
}
parser.nextToken();
switch (fieldName) {
case NAME_FIELD:
Expand Down Expand Up @@ -421,8 +448,16 @@
);
}
}
if (name == null) {
throw new FlowFrameworkException("A template object requires a name.", RestStatus.BAD_REQUEST);
if (!fieldUpdate) {
if (name == null) {
throw new FlowFrameworkException("A template object requires a name.", RestStatus.BAD_REQUEST);

Check warning on line 453 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L453

Added line #L453 was not covered by tests
}
if (description == null) {
description = "";

Check warning on line 456 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L456

Added line #L456 was not covered by tests
}
if (useCase == null) {
useCase = "";

Check warning on line 459 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L459

Added line #L459 was not covered by tests
}
}

return new Builder().name(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS;
import static org.opensearch.flowframework.common.CommonValue.USE_CASE;
import static org.opensearch.flowframework.common.CommonValue.VALIDATION;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
Expand Down Expand Up @@ -83,6 +84,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String workflowId = request.param(WORKFLOW_ID);
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);
boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false);
String useCase = request.param(USE_CASE);
// If provisioning, consume all other params and pass to provision transport action
Map<String, String> params = provision
Expand Down Expand Up @@ -117,11 +119,23 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
if (provision && updateFields) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use both the " + PROVISION_WORKFLOW + " and " + UPDATE_WORKFLOW_FIELDS + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
try {

Template template;
Map<String, String> useCaseDefaultsMap = Collections.emptyMap();
if (useCase != null) {
// Reconstruct the template from a substitution-ready use case
String useCaseTemplateFileInStringFormat = ParseUtils.resourceToString(
"/" + DefaultUseCases.getSubstitutionReadyFileByUseCaseName(useCase)
);
Expand Down Expand Up @@ -178,21 +192,25 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
null,
useCaseDefaultsMap
);
XContentParser parserTestJson = ParseUtils.jsonToParser(useCaseTemplateFileInStringFormat);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parserTestJson.currentToken(), parserTestJson);
template = Template.parse(parserTestJson);

XContentParser useCaseParser = ParseUtils.jsonToParser(useCaseTemplateFileInStringFormat);
ensureExpectedToken(XContentParser.Token.START_OBJECT, useCaseParser.currentToken(), useCaseParser);
template = Template.parse(useCaseParser);
} else {
XContentParser parser = request.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
template = Template.parse(parser);
template = Template.parse(parser, updateFields);
}

// If not provisioning, params map is empty. Use it to pass updateFields flag to WorkflowRequest
if (updateFields) {
params = Map.of(UPDATE_WORKFLOW_FIELDS, "true");
}

WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
template,
validation,
provision,
provision || updateFields,
params,
useCase,
useCaseDefaultsMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Template.Builder;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
Expand Down Expand Up @@ -233,43 +235,80 @@
);
} else {
// This is an existing workflow (PUT)
final boolean isFieldUpdate = request.isUpdateFields();
// Fetch existing entry for time stamps
logger.info("Querying existing workflow from global context: {}", workflowId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId), ActionListener.wrap(getResponse -> {
context.restore();
if (getResponse.isExists()) {
Template existingTemplate = Template.parse(getResponse.getSourceAsString());
// Update existing entry, full document replacement
Template template = new Template.Builder(templateWithUser).createdTime(existingTemplate.createdTime())
.lastUpdatedTime(Instant.now())
.lastProvisionedTime(existingTemplate.lastProvisionedTime())
.build();
Template template;
if (isFieldUpdate) {
// Update only specified fields in existing
Builder builder = new Template.Builder(existingTemplate).lastUpdatedTime(Instant.now());
if (templateWithUser.name() != null) {
builder.name(templateWithUser.name());
}
if (!Strings.isBlank(templateWithUser.description())) {
builder.description(templateWithUser.description());
}
if (!Strings.isBlank(templateWithUser.useCase())) {
builder.useCase(templateWithUser.useCase());
}
if (templateWithUser.templateVersion() != null) {
builder.templateVersion(templateWithUser.templateVersion());
}
if (!templateWithUser.compatibilityVersion().isEmpty()) {
builder.compatibilityVersion(templateWithUser.compatibilityVersion());
}
if (templateWithUser.getUiMetadata() != null) {
builder.uiMetadata(templateWithUser.getUiMetadata());
}
template = builder.build();
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Update existing entry, full document replacement
template = new Template.Builder(templateWithUser).createdTime(existingTemplate.createdTime())
.lastUpdatedTime(Instant.now())
.lastProvisionedTime(existingTemplate.lastProvisionedTime())
.build();
}
flowFrameworkIndicesHandler.updateTemplateInGlobalContext(
request.getWorkflowId(),
template,
ActionListener.wrap(response -> {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
Map.ofEntries(
Map.entry(STATE_FIELD, State.NOT_STARTED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED)
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name());
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
String errorMessage = "Failed to update workflow " + request.getWorkflowId() + " in template index";
logger.error(errorMessage, exception);
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
// Ignore state index if updating fields
if (!isFieldUpdate) {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
request.getWorkflowId(),
Map.ofEntries(
Map.entry(STATE_FIELD, State.NOT_STARTED),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED)
),
ActionListener.wrap(updateResponse -> {
logger.info(
"updated workflow {} state to {}",
request.getWorkflowId(),
State.NOT_STARTED.name()
);
}
})
);
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
String errorMessage = "Failed to update workflow "
+ request.getWorkflowId()

Check warning on line 297 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L296-L297

Added lines #L296 - L297 were not covered by tests
+ " in template index";
logger.error(errorMessage, exception);

Check warning on line 299 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L299

Added line #L299 was not covered by tests
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);

Check warning on line 301 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L301

Added line #L301 was not covered by tests
} else {
listener.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))

Check warning on line 304 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L303-L304

Added lines #L303 - L304 were not covered by tests
);
}
})

Check warning on line 307 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L307

Added line #L307 was not covered by tests
);
} else {
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}
}, exception -> {
String errorMessage = "Failed to update use case template " + request.getWorkflowId();
logger.error(errorMessage, exception);
Expand All @@ -278,7 +317,8 @@
} else {
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
}
})
}),
isFieldUpdate
);
} else {
String errorMessage = "Failed to retrieve template (" + workflowId + ") from global context.";
Expand Down
Loading
Loading