diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 000000000..d792d3287 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,16 @@ +### Description +[Describe what this change achieves] + +### Related Issues +Resolves #[Issue number to be closed when this PR is merged] + + +### Check List +- [ ] New functionality includes testing. +- [ ] New functionality has been documented. +- [ ] API changes companion pull request [created](https://github.com/opensearch-project/opensearch-api-specification/blob/main/DEVELOPER_GUIDE.md). +- [ ] Commits are signed per the DCO using `--signoff`. +- [ ] Public documentation issue/PR [created](https://github.com/opensearch-project/documentation-website/issues/new/choose). + +By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. +For more information on following Developer Certificate of Origin and signing off your commits, please check [here](https://github.com/opensearch-project/flow-framework/blob/main/CONTRIBUTING.md#developer-certificate-of-origin). diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 1b920f3ce..deda4a7e5 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -1,10 +1,12 @@ name: CI on: + workflow_dispatch: push: branches-ignore: - 'whitesource-remediate/**' - 'backport/**' + - 'create-pull-request/**' pull_request: types: [opened, synchronize, reopened] @@ -37,7 +39,7 @@ jobs: needs: [spotless, javadoc] strategy: matrix: - os: [ubuntu-latest, macos-13, windows-latest] + os: [ubuntu-latest, macos-latest, windows-latest] java: [21] name: Test JDK${{ matrix.java }}, ${{ matrix.os }} runs-on: ${{ matrix.os }} @@ -63,7 +65,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, macos-13, windows-latest] + os: [ubuntu-latest, macos-latest, windows-latest] java: [21] name: Integ Test JDK${{ matrix.java }}, ${{ matrix.os }} runs-on: ${{ matrix.os }} @@ -82,7 +84,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, macos-13, windows-latest] + os: [ubuntu-latest, macos-latest, windows-latest] java: [21] name: Multi-Node Integ Test JDK${{ matrix.java }}, ${{ matrix.os }} runs-on: ${{ matrix.os }} diff --git a/.github/workflows/test_bwc.yml b/.github/workflows/test_bwc.yml index bdb57733f..d5b31b7a5 100644 --- a/.github/workflows/test_bwc.yml +++ b/.github/workflows/test_bwc.yml @@ -1,11 +1,13 @@ name: BWC on: + workflow_dispatch: push: - branches: - - "**" + branches-ignore: + - 'whitesource-remediate/**' + - 'backport/**' + - 'create-pull-request/**' pull_request: - branches: - - "**" + types: [opened, synchronize, reopened] jobs: Build-ff-linux: diff --git a/.github/workflows/test_security.yml b/.github/workflows/test_security.yml index a1c77f39e..d3db98a00 100644 --- a/.github/workflows/test_security.yml +++ b/.github/workflows/test_security.yml @@ -1,12 +1,17 @@ name: Security test workflow for Flow Framework on: + workflow_dispatch: push: branches-ignore: - 'whitesource-remediate/**' - 'backport/**' + - 'create-pull-request/**' pull_request: types: [opened, synchronize, reopened] +env: + ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true + jobs: Get-CI-Image-Tag: uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main diff --git a/.github/workflows/wrapper.yml b/.github/workflows/wrapper.yml index 24c9413db..5171e187d 100644 --- a/.github/workflows/wrapper.yml +++ b/.github/workflows/wrapper.yml @@ -14,4 +14,4 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: gradle/wrapper-validation-action@v3 + - uses: gradle/actions/wrapper-validation@v3 diff --git a/CHANGELOG.md b/CHANGELOG.md index d6242dff8..dbc57cbe9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,16 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x) ### Features +- Support editing of certain workflow fields on a provisioned workflow ([#757](https://github.com/opensearch-project/flow-framework/pull/757)) +- Add allow_delete parameter to Deprovision API ([#763](https://github.com/opensearch-project/flow-framework/pull/763)) + ### Enhancements +- Register system index descriptors through SystemIndexPlugin.getSystemIndexDescriptors ([#750](https://github.com/opensearch-project/flow-framework/pull/750)) + ### Bug Fixes ### Infrastructure ### Documentation ### Maintenance ### Refactoring +- Improve Template and WorkflowState builders ([#778](https://github.com/opensearch-project/flow-framework/pull/778)) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 76ff9d0f2..003aac02d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,4 +1,4 @@ -- [Contributing to OpenSearch AI Flow Framework](#contributing-to-flow-framework) +- [Contributing to OpenSearch Flow Framework](#contributing-to-opensearch-flow-framework) - [First Things First](#first-things-first) - [Ways to Contribute](#ways-to-contribute) - [Bug Reports](#bug-reports) @@ -100,7 +100,7 @@ Adding in the change is two step process - Any new functionality requires testing. Your PR will trigger an automatic assessment of the code coverage of the lines you've added. You should add unit and/or integration tests to exercise as much of your new code as possible. -If you'd like to preview your coverage before submitting your PR, to identify lines of code which are not tested, you may run `./gradlew diffCoverage` and review the report available in the project build directory at `build/reports/jacoco/diffCoverage/html/index.html`. +If you'd like to preview your coverage before submitting your PR, to identify lines of code which are not tested, you may run `./gradlew test deltaCoverage` and review the report available in the project build directory at `build/reports/coverage-reports/delta-coverage/html/index.html`. ## Review Process diff --git a/build.gradle b/build.gradle index 698ae8a56..e0f38276c 100644 --- a/build.gradle +++ b/build.gradle @@ -47,13 +47,13 @@ buildscript { dependencies { classpath "org.opensearch.gradle:build-tools:${opensearch_version}" classpath "com.diffplug.spotless:spotless-plugin-gradle:6.25.0" - classpath "com.github.form-com.diff-coverage-gradle:diff-coverage:0.9.5" } } plugins { id "de.undercouch.download" version "5.6.0" - id "org.gradle.test-retry" version "1.5.9" apply false + id "org.gradle.test-retry" version "1.5.10" apply false + id "io.github.surpsg.delta-coverage" version "2.3.0" } apply plugin: 'java' @@ -66,9 +66,6 @@ apply plugin: 'com.diffplug.spotless' apply from: 'formatter/formatting.gradle' // for javadocs and checks spotless doesn't do apply plugin: 'checkstyle' -// for coverage and diff -apply plugin: 'jacoco' -apply plugin: 'com.form.diff-coverage' def pluginName = 'opensearch-flow-framework' def pluginDescription = 'OpenSearch plugin that enables builders to innovate AI apps on OpenSearch' @@ -167,14 +164,14 @@ configurations { dependencies { implementation "org.opensearch:opensearch:${opensearch_version}" - implementation 'org.junit.jupiter:junit-jupiter:5.10.2' + implementation 'org.junit.jupiter:junit-jupiter:5.10.3' api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" api group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}" - implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.14.0' + implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.15.0' implementation "org.opensearch:common-utils:${common_utils_version}" implementation 'com.amazonaws:aws-encryption-sdk-java:2.4.1' implementation 'org.bouncycastle:bcprov-jdk18on:1.78.1' - api "org.apache.httpcomponents.core5:httpcore5:5.2.2" + api "org.apache.httpcomponents.core5:httpcore5:5.2.5" implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1" implementation "org.glassfish:jakarta.json:2.0.1" implementation "org.eclipse:yasson:3.0.3" @@ -188,8 +185,8 @@ dependencies { configurations.all { resolutionStrategy { force("com.google.guava:guava:33.2.1-jre") // CVE for 31.1, keep to force transitive dependencies - force("org.eclipse.platform:org.eclipse.core.runtime:3.31.0") // CVE for < 3.29.0 - force("com.fasterxml.jackson.core:jackson-core:2.17.1") // Dependency Jar Hell + force("com.fasterxml.jackson.core:jackson-core:2.17.2") // Dependency Jar Hell + force("org.apache.httpcomponents.core5:httpcore5:5.2.5") // Dependency Jar Hell } } } @@ -394,7 +391,7 @@ testClusters.integTest { } - // Install Flow Framwork Plugin on integTest cluster nodes + // Install Flow Framework Plugin on integTest cluster nodes plugin(project.tasks.bundlePlugin.archiveFile) // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 @@ -683,38 +680,26 @@ task updateVersion { } } -diffCoverageReport { - afterEvaluate { - // Get uncommitted files via git diff - // https://github.com/form-com/diff-coverage-gradle/issues/73 - def file = Files.createTempFile(URLEncoder.encode(project.name, 'UTF-8'), '.diff').toFile() - def diffBase = 'refs/remotes/origin/main' - // Only run locally - if (!System.getenv('CI')) { - file.withOutputStream { out -> - exec { - ignoreExitValue true - commandLine 'git', 'diff', '--no-color', '--minimal', diffBase - standardOutput = out - } - } - } - diffSource.file = file +deltaCoverageReport { + diffSource { + git.compareWith("refs/remotes/origin/main") } - // View report at build/reports/jacoco/diffCoverage/html/index.html - reports { - html = true + violationRules { + failOnViolation.set(true) + rule(io.github.surpsg.deltacoverage.gradle.CoverageEntity.LINE) { + minCoverageRatio.set(0.75d) + } + rule(io.github.surpsg.deltacoverage.gradle.CoverageEntity.BRANCH) { + minCoverageRatio.set(0.6d) + } } - violationRules { - minBranches = 0.60 - minLines = 0.75 - failOnViolation = true + reports { + html.set(true) } } - tasks.withType(AbstractPublishToMaven) { def predicate = provider { publication.name == "pluginZip" diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index e6441136f..2c3521197 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 381baa9ce..68e8816d7 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,7 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionSha256Sum=544c35d6bd849ae8a5ed0bcea39ba677dc40f49df7d1835561582da2009b961d -distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip +distributionSha256Sum=d725d707bfabd4dfdc958c624003b3c80accc03f7037b5122c4b1d0ef15cecab +distributionUrl=https\://services.gradle.org/distributions/gradle-8.9-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index 1aa94a426..f5feea6d6 100755 --- a/gradlew +++ b/gradlew @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +# SPDX-License-Identifier: Apache-2.0 +# ############################################################################## # @@ -55,7 +57,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -84,7 +86,8 @@ done # shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} # Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) -APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit +APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s +' "$PWD" ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum diff --git a/gradlew.bat b/gradlew.bat index 7101f8e46..9b42019c7 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -13,6 +13,8 @@ @rem See the License for the specific language governing permissions and @rem limitations under the License. @rem +@rem SPDX-License-Identifier: Apache-2.0 +@rem @if "%DEBUG%"=="" @echo off @rem ########################################################################## diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 5fc53ec9a..f69534a77 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -59,9 +59,11 @@ import org.opensearch.flowframework.util.EncryptorUtils; import org.opensearch.flowframework.workflow.WorkflowProcessSorter; import org.opensearch.flowframework.workflow.WorkflowStepFactory; +import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; @@ -75,9 +77,12 @@ import java.util.List; import java.util.function.Supplier; +import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX; import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; +import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; @@ -88,7 +93,7 @@ /** * An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch. */ -public class FlowFrameworkPlugin extends Plugin implements ActionPlugin { +public class FlowFrameworkPlugin extends Plugin implements ActionPlugin, SystemIndexPlugin { private FlowFrameworkSettings flowFrameworkSettings; @@ -128,13 +133,7 @@ public Collection createComponents( flowFrameworkSettings, client ); - WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter( - workflowStepFactory, - threadPool, - clusterService, - client, - flowFrameworkSettings - ); + WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool, flowFrameworkSettings); return List.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler, flowFrameworkSettings); } @@ -210,4 +209,13 @@ public List> getExecutorBuilders(Settings settings) { ); } + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return List.of( + new SystemIndexDescriptor(CONFIG_INDEX, "Flow Framework Config index"), + new SystemIndexDescriptor(GLOBAL_CONTEXT_INDEX, "Flow Framework Global Context index"), + new SystemIndexDescriptor(WORKFLOW_STATE_INDEX, "Flow Framework Workflow State index") + ); + } + } diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index b1fe943d5..f291cff1c 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -68,8 +68,12 @@ private CommonValue() {} public static final String WORKFLOW_ID = "workflow_id"; /** Field name for template validation, the flag to indicate if validation is necessary */ public static final String VALIDATION = "validation"; + /** Param name for allow deletion during deprovisioning */ + public static final String ALLOW_DELETE = "allow_delete"; /** The field name for provision workflow within a use case template*/ public static final String PROVISION_WORKFLOW = "provision"; + /** The param name for update workflow field in create API */ + public static final String UPDATE_WORKFLOW_FIELDS = "update_fields"; /** The field name for workflow steps. This field represents the name of the workflow steps to be fetched. */ public static final String WORKFLOW_STEP = "workflow_step"; /** The param name for default use case, used by the create workflow API */ diff --git a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java index 2d73c0ba8..f54a0a41d 100644 --- a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java +++ b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java @@ -18,7 +18,9 @@ import org.opensearch.flowframework.workflow.CreateSearchPipelineStep; import org.opensearch.flowframework.workflow.DeleteAgentStep; import org.opensearch.flowframework.workflow.DeleteConnectorStep; +import org.opensearch.flowframework.workflow.DeleteIngestPipelineStep; import org.opensearch.flowframework.workflow.DeleteModelStep; +import org.opensearch.flowframework.workflow.DeleteSearchPipelineStep; import org.opensearch.flowframework.workflow.DeployModelStep; import org.opensearch.flowframework.workflow.NoOpStep; import org.opensearch.flowframework.workflow.RegisterAgentStep; @@ -56,15 +58,23 @@ public enum WorkflowResources { /** Workflow steps for deploying/undeploying a model and associated created resource */ DEPLOY_MODEL(DeployModelStep.NAME, WorkflowResources.MODEL_ID, UndeployModelStep.NAME, null), /** Workflow steps for creating an ingest-pipeline and associated created resource */ - CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null, UpdateIngestPipelineStep.NAME), // TODO - // delete - // step + CREATE_INGEST_PIPELINE( + CreateIngestPipelineStep.NAME, + WorkflowResources.PIPELINE_ID, + DeleteIngestPipelineStep.NAME, + UpdateIngestPipelineStep.NAME + ), /** Workflow steps for creating an ingest-pipeline and associated created resource */ - CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null, UpdateSearchPipelineStep.NAME), // TODO - // delete - // step + CREATE_SEARCH_PIPELINE( + CreateSearchPipelineStep.NAME, + WorkflowResources.PIPELINE_ID, + DeleteSearchPipelineStep.NAME, + UpdateSearchPipelineStep.NAME + ), /** Workflow steps for creating an index and associated created resource */ CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME, UpdateIndexStep.NAME), + /** Workflow steps for reindex a source index to destination index and associated created resource */ + REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME, null), /** Workflow steps for registering/deleting an agent and the associated created resource */ REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME, null); diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 8fcfd1207..18f0a9780 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -357,7 +357,8 @@ public void initializeConfigIndex(ActionListener listener) { * @param listener action listener */ public void putInitialStateToWorkflowState(String workflowId, User user, ActionListener listener) { - WorkflowState state = new WorkflowState.Builder().workflowId(workflowId) + WorkflowState state = WorkflowState.builder() + .workflowId(workflowId) .state(State.NOT_STARTED.name()) .provisioningProgress(ProvisioningProgress.NOT_STARTED.name()) .user(user) diff --git a/src/main/java/org/opensearch/flowframework/model/Template.java b/src/main/java/org/opensearch/flowframework/model/Template.java index 71632d003..f4d2ae600 100644 --- a/src/main/java/org/opensearch/flowframework/model/Template.java +++ b/src/main/java/org/opensearch/flowframework/model/Template.java @@ -8,6 +8,7 @@ */ package org.opensearch.flowframework.model; +import org.apache.logging.log4j.util.Strings; import org.opensearch.Version; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.json.JsonXContent; @@ -29,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.flowframework.common.CommonValue.CREATED_TIME; @@ -53,6 +55,14 @@ public class Template implements ToXContentObject { public static final String TEMPLATE_FIELD = "template"; /** The template field name for template use case */ public static final String USE_CASE_FIELD = "use_case"; + /** Fields which may be updated in the template even if provisioned */ + public static final Set UPDATE_FIELD_ALLOWLIST = Set.of( + NAME_FIELD, + DESCRIPTION_FIELD, + USE_CASE_FIELD, + VERSION_FIELD, + UI_METADATA_FIELD + ); private final String name; private final String description; @@ -77,9 +87,9 @@ public class Template implements ToXContentObject { * @param workflows Workflow graph definitions corresponding to the defined operations. * @param uiMetadata The UI metadata related to the given workflow * @param user The user extracted from the thread context from the request - * @param createdTime Created time in milliseconds since the epoch - * @param lastUpdatedTime Last Updated time in milliseconds since the epoch - * @param lastProvisionedTime Last Provisioned time in milliseconds since the epoch + * @param createdTime Created time as an Instant + * @param lastUpdatedTime Last Updated time as an Instant + * @param lastProvisionedTime Last Provisioned time as an Instant */ public Template( String name, @@ -126,13 +136,13 @@ public static class Builder { /** * Empty Constructor for the Builder object */ - public Builder() {} + private Builder() {} /** * Construct a Builder from an existing template * @param t The existing template to copy */ - public Builder(Template t) { + private Builder(Template t) { this.name = t.name(); this.description = t.description(); this.useCase = t.useCase(); @@ -283,12 +293,29 @@ public Template build() { } } + /** + * Instantiate a new Template builder + * @return a new builder instance + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Instantiate a new Template builder initialized from an existing template + * @param t The existing template to use as the source + * @return a new builder instance initialized from the existing template + */ + public static Builder builder(Template t) { + return new Builder(t); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { XContentBuilder xContentBuilder = builder.startObject(); - xContentBuilder.field(NAME_FIELD, this.name); - xContentBuilder.field(DESCRIPTION_FIELD, this.description); - xContentBuilder.field(USE_CASE_FIELD, this.useCase); + xContentBuilder.field(NAME_FIELD, this.name.trim()); + xContentBuilder.field(DESCRIPTION_FIELD, this.description == null ? "" : this.description.trim()); + xContentBuilder.field(USE_CASE_FIELD, this.useCase == null ? "" : this.useCase.trim()); if (this.templateVersion != null || !this.compatibilityVersion.isEmpty()) { xContentBuilder.startObject(VERSION_FIELD); @@ -334,6 +361,35 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return xContentBuilder.endObject(); } + /** + * Merges two templates by updating the fields from an existing template with the (non-null) fields of another one. + * @param existingTemplate An existing complete template. + * @param templateWithNewFields A template containing only fields to update. The fields must correspond to the field names in {@link #UPDATE_FIELD_ALLOWLIST}. + * @return the updated template. + */ + public static Template updateExistingTemplate(Template existingTemplate, Template templateWithNewFields) { + Builder builder = Template.builder(existingTemplate).lastUpdatedTime(Instant.now()); + if (templateWithNewFields.name() != null) { + builder.name(templateWithNewFields.name()); + } + if (!Strings.isBlank(templateWithNewFields.description())) { + builder.description(templateWithNewFields.description()); + } + if (!Strings.isBlank(templateWithNewFields.useCase())) { + builder.useCase(templateWithNewFields.useCase()); + } + if (templateWithNewFields.templateVersion() != null) { + builder.templateVersion(templateWithNewFields.templateVersion()); + } + if (!templateWithNewFields.compatibilityVersion().isEmpty()) { + builder.compatibilityVersion(templateWithNewFields.compatibilityVersion()); + } + if (templateWithNewFields.getUiMetadata() != null) { + builder.uiMetadata(templateWithNewFields.getUiMetadata()); + } + return builder.build(); + } + /** * Parse raw xContent into a Template instance. * @@ -342,9 +398,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws * @throws IOException if content can't be parsed correctly */ public static Template parse(XContentParser parser) throws IOException { + return parse(parser, false); + } + + /** + * Parse raw xContent into a Template instance. + * + * @param parser xContent based content parser + * @param fieldUpdate if set true, will be used for updating an existing template + * @return an instance of the template + * @throws IOException if content can't be parsed correctly + */ + public static Template parse(XContentParser parser, boolean fieldUpdate) throws IOException { String name = null; - String description = ""; - String useCase = ""; + String description = null; + String useCase = null; Version templateVersion = null; List compatibilityVersion = new ArrayList<>(); Map workflows = new HashMap<>(); @@ -357,6 +425,12 @@ public static Template parse(XContentParser parser) throws IOException { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { String fieldName = parser.currentName(); + if (fieldUpdate && !UPDATE_FIELD_ALLOWLIST.contains(fieldName)) { + throw new FlowFrameworkException( + "You can not update the field [" + fieldName + "] without updating the whole template.", + RestStatus.BAD_REQUEST + ); + } parser.nextToken(); switch (fieldName) { case NAME_FIELD: @@ -421,8 +495,16 @@ public static Template parse(XContentParser parser) throws IOException { ); } } - if (name == null) { - throw new FlowFrameworkException("A template object requires a name.", RestStatus.BAD_REQUEST); + if (!fieldUpdate) { + if (name == null) { + throw new FlowFrameworkException("A template object requires a name.", RestStatus.BAD_REQUEST); + } + if (description == null) { + description = ""; + } + if (useCase == null) { + useCase = ""; + } } return new Builder().name(name) diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowState.java b/src/main/java/org/opensearch/flowframework/model/WorkflowState.java index 6b1593b6e..6a4b81a55 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowState.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowState.java @@ -126,6 +126,15 @@ public static Builder builder() { return new Builder(); } + /** + * Constructs a builder object for workflowState from an existing state + * @param existingState a WorkflowState object to initialize the builder with + * @return Builder Object initialized with existing state + */ + public static Builder builder(WorkflowState existingState) { + return new Builder(existingState); + } + /** * Class for constructing a Builder for WorkflowState */ @@ -143,7 +152,23 @@ public static class Builder { /** * Empty Constructor for the Builder object */ - public Builder() {} + private Builder() {} + + /** + * Builder from existing state + * @param existingState a WorkflowState object to initialize the builder with + */ + private Builder(WorkflowState existingState) { + this.workflowId = existingState.getWorkflowId(); + this.error = existingState.getError(); + this.state = existingState.getState(); + this.provisioningProgress = existingState.getProvisioningProgress(); + this.provisionStartTime = existingState.getProvisionStartTime(); + this.provisionEndTime = existingState.getProvisionEndTime(); + this.user = existingState.getUser(); + this.userOutputs = existingState.userOutputs(); + this.resourcesCreated = existingState.resourcesCreated(); + } /** * Builder method for adding workflowID @@ -254,6 +279,44 @@ public WorkflowState build() { } } + /** + * Merges two workflow states by updating the fields from an existing state with the (non-null) fields of another one. + * @param existingState An existing Workflow state. + * @param stateWithNewFields A workflow state containing only fields to update. + * @return the updated workflow state. + */ + public static WorkflowState updateExistingWorkflowState(WorkflowState existingState, WorkflowState stateWithNewFields) { + Builder builder = WorkflowState.builder(existingState); + if (stateWithNewFields.getWorkflowId() != null) { + builder.workflowId(stateWithNewFields.getWorkflowId()); + } + if (stateWithNewFields.getError() != null) { + builder.error(stateWithNewFields.getError()); + } + if (stateWithNewFields.getState() != null) { + builder.state(stateWithNewFields.getState()); + } + if (stateWithNewFields.getProvisioningProgress() != null) { + builder.provisioningProgress(stateWithNewFields.getProvisioningProgress()); + } + if (stateWithNewFields.getProvisionStartTime() != null) { + builder.provisionStartTime(stateWithNewFields.getProvisionStartTime()); + } + if (stateWithNewFields.getProvisionEndTime() != null) { + builder.provisionEndTime(stateWithNewFields.getProvisionEndTime()); + } + if (stateWithNewFields.getUser() != null) { + builder.user(stateWithNewFields.getUser()); + } + if (stateWithNewFields.userOutputs() != null) { + builder.userOutputs(stateWithNewFields.userOutputs()); + } + if (stateWithNewFields.resourcesCreated() != null) { + builder.resourcesCreated(stateWithNewFields.resourcesCreated()); + } + return builder.build(); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { XContentBuilder xContentBuilder = builder.startObject(); @@ -492,7 +555,7 @@ public Map userOutputs() { } /** - * A map of all the resources created + * A list of all the resources created * @return the resources created */ public List resourcesCreated() { diff --git a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java index a27761a7d..16bc6182c 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java @@ -40,6 +40,7 @@ import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW; import static org.opensearch.flowframework.common.CommonValue.REPROVISION_WORKFLOW; +import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS; import static org.opensearch.flowframework.common.CommonValue.USE_CASE; import static org.opensearch.flowframework.common.CommonValue.VALIDATION; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; @@ -85,6 +86,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" }); boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false); boolean reprovision = request.paramAsBoolean(REPROVISION_WORKFLOW, false); + boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false); String useCase = request.param(USE_CASE); // If provisioning, consume all other params and pass to provision transport action @@ -120,11 +122,23 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)) ); } + if (provision && updateFields) { + // Consume params and content so custom exception is processed + params.keySet().stream().forEach(request::param); + request.content(); + FlowFrameworkException ffe = new FlowFrameworkException( + "You can not use both the " + PROVISION_WORKFLOW + " and " + UPDATE_WORKFLOW_FIELDS + " parameters in the same request.", + RestStatus.BAD_REQUEST + ); + return channel -> channel.sendResponse( + new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } try { - Template template; Map useCaseDefaultsMap = Collections.emptyMap(); if (useCase != null) { + // Reconstruct the template from a substitution-ready use case String useCaseTemplateFileInStringFormat = ParseUtils.resourceToString( "/" + DefaultUseCases.getSubstitutionReadyFileByUseCaseName(useCase) ); @@ -181,21 +195,25 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli null, useCaseDefaultsMap ); - XContentParser parserTestJson = ParseUtils.jsonToParser(useCaseTemplateFileInStringFormat); - ensureExpectedToken(XContentParser.Token.START_OBJECT, parserTestJson.currentToken(), parserTestJson); - template = Template.parse(parserTestJson); - + XContentParser useCaseParser = ParseUtils.jsonToParser(useCaseTemplateFileInStringFormat); + ensureExpectedToken(XContentParser.Token.START_OBJECT, useCaseParser.currentToken(), useCaseParser); + template = Template.parse(useCaseParser); } else { XContentParser parser = request.contentParser(); ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - template = Template.parse(parser); + template = Template.parse(parser, updateFields); + } + + // If not provisioning, params map is empty. Use it to pass updateFields flag to WorkflowRequest + if (updateFields) { + params = Map.of(UPDATE_WORKFLOW_FIELDS, "true"); } WorkflowRequest workflowRequest = new WorkflowRequest( workflowId, template, validation, - provision, + provision || updateFields, params, useCase, useCaseDefaultsMap, diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java index 0d0bf8b64..d75255b8d 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java @@ -25,9 +25,12 @@ import org.opensearch.rest.RestRequest; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; +import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; @@ -57,6 +60,7 @@ public String getName() { @Override protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { String workflowId = request.param(WORKFLOW_ID); + String allowDelete = request.param(ALLOW_DELETE); try { if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) { throw new FlowFrameworkException( @@ -73,7 +77,11 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request if (workflowId == null) { throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST); } - WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + WorkflowRequest workflowRequest = new WorkflowRequest( + workflowId, + null, + allowDelete == null ? Collections.emptyMap() : Map.of(ALLOW_DELETE, allowDelete) + ); return channel -> client.execute(DeprovisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> { XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index e4f4dfc6a..3114ee7d5 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -233,6 +233,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { - // Regular update, reset provisioning status - flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( - request.getWorkflowId(), - Map.ofEntries( - Map.entry(STATE_FIELD, State.NOT_STARTED), - Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED) - ), - ActionListener.wrap(updateResponse -> { - logger.info( - "updated workflow {} state to {}", - request.getWorkflowId(), - State.NOT_STARTED.name() - ); - listener.onResponse(new WorkflowResponse(request.getWorkflowId())); - }, exception -> { - String errorMessage = "Failed to update workflow " - + request.getWorkflowId() - + " in template index"; - logger.error(errorMessage, exception); - if (exception instanceof FlowFrameworkException) { - listener.onFailure(exception); - } else { - listener.onFailure( - new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)) + // Regular update, reset provisioning status, ignore state index if updating fields + if (!isFieldUpdate) { + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( + request.getWorkflowId(), + Map.ofEntries( + Map.entry(STATE_FIELD, State.NOT_STARTED), + Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED) + ), + ActionListener.wrap(updateResponse -> { + logger.info( + "updated workflow {} state to {}", + request.getWorkflowId(), + State.NOT_STARTED.name() ); - } - }) - ); + listener.onResponse(new WorkflowResponse(request.getWorkflowId())); + }, exception -> { + String errorMessage = "Failed to update workflow " + + request.getWorkflowId() + + " in template index"; + logger.error(errorMessage, exception); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure( + new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)) + ); + } + }) + ); + } else { + listener.onResponse(new WorkflowResponse(request.getWorkflowId())); + } }, exception -> { String errorMessage = "Failed to update use case template " + request.getWorkflowId(); logger.error(errorMessage, exception); @@ -313,7 +321,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener listener) { String workflowId = request.getWorkflowId(); + String allowDelete = request.getParams().get(ALLOW_DELETE); GetWorkflowStateRequest getStateRequest = new GetWorkflowStateRequest(workflowId, true); // Stash thread context to interact with system index @@ -103,9 +108,17 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { context.restore(); + Set deleteAllowedResources = Strings.tokenizeByCommaToSet(allowDelete); // Retrieve resources from workflow state and deprovision threadPool.executor(DEPROVISION_WORKFLOW_THREAD_POOL) - .execute(() -> executeDeprovisionSequence(workflowId, response.getWorkflowState().resourcesCreated(), listener)); + .execute( + () -> executeDeprovisionSequence( + workflowId, + response.getWorkflowState().resourcesCreated(), + deleteAllowedResources, + listener + ) + ); }, exception -> { String errorMessage = "Failed to get workflow state for workflow " + workflowId; logger.error(errorMessage, exception); @@ -121,18 +134,20 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener resourcesCreated, + Set deleteAllowedResources, ActionListener listener ) { - + List deleteNotAllowed = new ArrayList<>(); // Create a list of ProcessNodes with the corresponding deprovision workflow steps List deprovisionProcessSequence = new ArrayList<>(); for (ResourceCreated resource : resourcesCreated) { String workflowStepId = resource.workflowStepId(); String stepName = resource.workflowStepName(); - String deprovisionStep = getDeprovisionStepByWorkflowStep(stepName); - // Unimplemented steps presently return null, so skip - if (deprovisionStep == null) { + WorkflowStep deprovisionStep = workflowStepFactory.createStep(getDeprovisionStepByWorkflowStep(stepName)); + // Skip if the step requires allow_delete but the resourceId isn't included + if (deprovisionStep.allowDeleteRequired() && !deleteAllowedResources.contains(resource.resourceId())) { + deleteNotAllowed.add(resource); continue; } // New ID is old ID with (deprovision step type) prepended @@ -140,7 +155,7 @@ private void executeDeprovisionSequence( deprovisionProcessSequence.add( new ProcessNode( deprovisionStepId, - workflowStepFactory.createStep(deprovisionStep), + deprovisionStep, Collections.emptyMap(), Collections.emptyMap(), new WorkflowData(Map.of(getResourceByWorkflowStep(stepName), resource.resourceId()), workflowId, deprovisionStepId), @@ -215,17 +230,21 @@ private void executeDeprovisionSequence( List remainingResources = deprovisionProcessSequence.stream() .map(pn -> getResourceFromDeprovisionNode(pn, resourcesCreated)) .collect(Collectors.toList()); - logger.info("Resources remaining: {}", remainingResources); - updateWorkflowState(workflowId, remainingResources, listener); + logger.info("Resources remaining: {}.", remainingResources); + if (!deleteNotAllowed.isEmpty()) { + logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed); + } + updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener); } private void updateWorkflowState( String workflowId, List remainingResources, + List deleteNotAllowed, ActionListener listener ) { - if (remainingResources.isEmpty()) { - // Successful deprovision, reset state to initial + if (remainingResources.isEmpty() && deleteNotAllowed.isEmpty()) { + // Successful deprovision of all resources, reset state to initial flowFrameworkIndicesHandler.doesTemplateExist(workflowId, templateExists -> { if (Boolean.TRUE.equals(templateExists)) { flowFrameworkIndicesHandler.putInitialStateToWorkflowState( @@ -244,35 +263,49 @@ private void updateWorkflowState( listener.onResponse(new WorkflowResponse(workflowId)); }, listener); } else { - // Failed deprovision + // Remaining resources only includes ones we tried to delete + List stateIndexResources = new ArrayList<>(remainingResources); + // Add in those we skipped + stateIndexResources.addAll(deleteNotAllowed); flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( workflowId, Map.ofEntries( Map.entry(STATE_FIELD, State.COMPLETED), Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.DONE), Map.entry(PROVISION_END_TIME_FIELD, Instant.now().toEpochMilli()), - Map.entry(RESOURCES_CREATED_FIELD, remainingResources) + Map.entry(RESOURCES_CREATED_FIELD, stateIndexResources) ), ActionListener.wrap(updateResponse -> { logger.info("updated workflow {} state to COMPLETED", workflowId); }, exception -> { logger.error("Failed to update workflow {} state", workflowId, exception); }) ); // give user list of remaining resources + StringBuilder message = new StringBuilder(); + appendResourceInfo(message, "Failed to deprovision some resources: ", remainingResources); + appendResourceInfo(message, "These resources require the " + ALLOW_DELETE + " parameter to deprovision: ", deleteNotAllowed); listener.onFailure( - new FlowFrameworkException( - "Failed to deprovision some resources: [" - + remainingResources.stream() - .map(DeprovisionWorkflowTransportAction::getResourceNameAndId) - .filter(Objects::nonNull) - .distinct() - .collect(Collectors.joining(", ")) - + "].", - RestStatus.ACCEPTED - ) + new FlowFrameworkException(message.toString(), remainingResources.isEmpty() ? RestStatus.FORBIDDEN : RestStatus.ACCEPTED) ); } } + private static void appendResourceInfo(StringBuilder message, String prefix, List resources) { + if (!resources.isEmpty()) { + if (message.length() > 0) { + message.append(" "); + } + message.append(prefix) + .append( + resources.stream() + .map(DeprovisionWorkflowTransportAction::getResourceNameAndId) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.joining(", ", "[", "]")) + ) + .append("."); + } + } + private static ResourceCreated getResourceFromDeprovisionNode(ProcessNode deprovisionNode, List resourcesCreated) { return resourcesCreated.stream() .filter(r -> deprovisionNode.id().equals("(deprovision_" + r.workflowStepName() + ") " + r.workflowStepId())) diff --git a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java index 6f8b9e14b..517126c2b 100644 --- a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java +++ b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java @@ -48,7 +48,8 @@ public GetWorkflowStateResponse(WorkflowState workflowState, boolean allStatus) if (allStatus) { this.workflowState = workflowState; } else { - this.workflowState = new WorkflowState.Builder().workflowId(workflowState.getWorkflowId()) + this.workflowState = WorkflowState.builder() + .workflowId(workflowState.getWorkflowId()) .error(workflowState.getError()) .state(workflowState.getState()) .resourcesCreated(workflowState.resourcesCreated()) diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index b1a4cc5da..44eb07df4 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -146,7 +146,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener params, String useCase, Map defaultParams, @@ -128,11 +135,12 @@ public WorkflowRequest( this.workflowId = workflowId; this.template = template; this.validation = validation; - this.provision = provision; - if (!provision && !params.isEmpty()) { + this.provision = provisionOrUpdate && !params.containsKey(UPDATE_WORKFLOW_FIELDS); + this.updateFields = !provision && Boolean.parseBoolean(params.get(UPDATE_WORKFLOW_FIELDS)); + if (!this.provision && params.keySet().stream().anyMatch(k -> !UPDATE_WORKFLOW_FIELDS.equals(k))) { throw new IllegalArgumentException("Params may only be included when provisioning."); } - this.params = params; + this.params = this.updateFields ? Collections.emptyMap() : params; this.useCase = useCase; this.defaultParams = defaultParams; this.reprovision = reprovision; @@ -149,8 +157,13 @@ public WorkflowRequest(StreamInput in) throws IOException { String templateJson = in.readOptionalString(); this.template = templateJson == null ? null : Template.parse(templateJson); this.validation = in.readStringArray(); - this.provision = in.readBoolean(); - this.params = this.provision ? in.readMap(StreamInput::readString, StreamInput::readString) : Collections.emptyMap(); + boolean provisionOrUpdate = in.readBoolean(); + this.params = provisionOrUpdate ? in.readMap(StreamInput::readString, StreamInput::readString) : Collections.emptyMap(); + this.provision = provisionOrUpdate && !params.containsKey(UPDATE_WORKFLOW_FIELDS); + this.updateFields = !provision && Boolean.parseBoolean(params.get(UPDATE_WORKFLOW_FIELDS)); + if (this.updateFields) { + this.params = Collections.emptyMap(); + } this.reprovision = in.readBoolean(); } @@ -188,6 +201,14 @@ public boolean isProvision() { return this.provision; } + /** + * Gets the update fields flag + * @return the update fields boolean + */ + public boolean isUpdateFields() { + return this.updateFields; + } + /** * Gets the params map * @return the params map @@ -226,9 +247,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(workflowId); out.writeOptionalString(template == null ? null : template.toJson()); out.writeStringArray(validation); - out.writeBoolean(provision); + out.writeBoolean(provision || updateFields); if (provision) { out.writeMap(params, StreamOutput::writeString, StreamOutput::writeString); + } else if (updateFields) { + out.writeMap(Map.of(UPDATE_WORKFLOW_FIELDS, "true"), StreamOutput::writeString, StreamOutput::writeString); } out.writeBoolean(reprovision); } diff --git a/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java b/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java index 3ba26a386..02daeaff0 100644 --- a/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java @@ -169,7 +169,7 @@ private Template processTemplateCredentials(Template template, Function REQUIRED_INPUTS = Set.of(INDEX_NAME); + /** Optional input keys */ + public static final Set OPTIONAL_INPUTS = Collections.emptySet(); + /** Provided output keys */ + public static final Set PROVIDED_OUTPUTS = Set.of(INDEX_NAME); + + /** + * Instantiate this class + * + * @param client Client to delete an index + */ + public DeleteIndexStep(Client client) { + this.client = client; + } + + @Override + public PlainActionFuture execute( + String currentNodeId, + WorkflowData currentNodeInputs, + Map outputs, + Map previousNodeInputs, + Map params + ) { + PlainActionFuture deleteIndexFuture = PlainActionFuture.newFuture(); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + REQUIRED_INPUTS, + OPTIONAL_INPUTS, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + + String indexName = (String) inputs.get(INDEX_NAME); + + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); + + client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(acknowledgedResponse -> { + logger.info("Deleted index: {}", indexName); + deleteIndexFuture.onResponse( + new WorkflowData( + Map.ofEntries(Map.entry(INDEX_NAME, indexName)), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + }, ex -> { + Exception e = getSafeException(ex); + String errorMessage = (e == null ? "Failed to delete the index " + indexName : e.getMessage()); + logger.error(errorMessage, e); + deleteIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); + })); + } catch (Exception e) { + deleteIndexFuture.onFailure(e); + } + return deleteIndexFuture; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public boolean allowDeleteRequired() { + return true; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStep.java new file mode 100644 index 000000000..473b1655c --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStep.java @@ -0,0 +1,108 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ingest.DeletePipelineRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.Client; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.exception.WorkflowStepException; +import org.opensearch.flowframework.util.ParseUtils; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; + +/** + * Step to delete an ingest pipeline + */ +public class DeleteIngestPipelineStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(DeleteIngestPipelineStep.class); + private final Client client; + + /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ + public static final String NAME = "delete_ingest_pipeline"; + /** Required input keys */ + public static final Set REQUIRED_INPUTS = Set.of(PIPELINE_ID); + /** Optional input keys */ + public static final Set OPTIONAL_INPUTS = Collections.emptySet(); + /** Provided output keys */ + public static final Set PROVIDED_OUTPUTS = Set.of(PIPELINE_ID); + + /** + * Instantiate this class + * + * @param client Client to delete an Ingest Pipeline + */ + public DeleteIngestPipelineStep(Client client) { + this.client = client; + } + + @Override + public PlainActionFuture execute( + String currentNodeId, + WorkflowData currentNodeInputs, + Map outputs, + Map previousNodeInputs, + Map params + ) { + PlainActionFuture deletePipelineFuture = PlainActionFuture.newFuture(); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + REQUIRED_INPUTS, + OPTIONAL_INPUTS, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + + String pipelineId = (String) inputs.get(PIPELINE_ID); + + DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest(pipelineId); + + client.admin().cluster().deletePipeline(deletePipelineRequest, ActionListener.wrap(acknowledgedResponse -> { + logger.info("Deleted IngestPipeline: {}", pipelineId); + deletePipelineFuture.onResponse( + new WorkflowData( + Map.ofEntries(Map.entry(PIPELINE_ID, pipelineId)), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + }, ex -> { + Exception e = getSafeException(ex); + String errorMessage = (e == null ? "Failed to delete the ingest pipeline " + pipelineId : e.getMessage()); + logger.error(errorMessage, e); + deletePipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); + })); + } catch (Exception e) { + deletePipelineFuture.onFailure(e); + } + return deletePipelineFuture; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public boolean allowDeleteRequired() { + return true; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStep.java new file mode 100644 index 000000000..a391ef20d --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStep.java @@ -0,0 +1,108 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.search.DeleteSearchPipelineRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.Client; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.exception.WorkflowStepException; +import org.opensearch.flowframework.util.ParseUtils; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; + +/** + * Step to delete a search pipeline + */ +public class DeleteSearchPipelineStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(DeleteSearchPipelineStep.class); + private final Client client; + + /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ + public static final String NAME = "delete_search_pipeline"; + /** Required input keys */ + public static final Set REQUIRED_INPUTS = Set.of(PIPELINE_ID); + /** Optional input keys */ + public static final Set OPTIONAL_INPUTS = Collections.emptySet(); + /** Provided output keys */ + public static final Set PROVIDED_OUTPUTS = Set.of(PIPELINE_ID); + + /** + * Instantiate this class + * + * @param client Client to delete a Search Pipeline + */ + public DeleteSearchPipelineStep(Client client) { + this.client = client; + } + + @Override + public PlainActionFuture execute( + String currentNodeId, + WorkflowData currentNodeInputs, + Map outputs, + Map previousNodeInputs, + Map params + ) { + PlainActionFuture deleteSearchPipelineFuture = PlainActionFuture.newFuture(); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + REQUIRED_INPUTS, + OPTIONAL_INPUTS, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + + String pipelineId = (String) inputs.get(PIPELINE_ID); + + DeleteSearchPipelineRequest deleteSearchPipelineRequest = new DeleteSearchPipelineRequest(pipelineId); + + client.admin().cluster().deleteSearchPipeline(deleteSearchPipelineRequest, ActionListener.wrap(acknowledgedResponse -> { + logger.info("Deleted SearchPipeline: {}", pipelineId); + deleteSearchPipelineFuture.onResponse( + new WorkflowData( + Map.ofEntries(Map.entry(PIPELINE_ID, pipelineId)), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + }, ex -> { + Exception e = getSafeException(ex); + String errorMessage = (e == null ? "Failed to delete the search pipeline " + pipelineId : e.getMessage()); + logger.error(errorMessage, e); + deleteSearchPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); + })); + } catch (Exception e) { + deleteSearchPipelineFuture.onFailure(e); + } + return deleteSearchPipelineFuture; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public boolean allowDeleteRequired() { + return true; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index 92dd1c8e0..08457885f 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -10,8 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.client.Client; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.common.FlowFrameworkSettings; @@ -59,33 +57,32 @@ public class WorkflowProcessSorter { private static final Logger logger = LogManager.getLogger(WorkflowProcessSorter.class); + /** Workflow step types which may not be used in a template */ + public static final Set WORKFLOW_STEP_DENYLIST = Set.of( + DeleteIndexStep.NAME, + DeleteIngestPipelineStep.NAME, + DeleteSearchPipelineStep.NAME + ); + private WorkflowStepFactory workflowStepFactory; private ThreadPool threadPool; private Integer maxWorkflowSteps; - private ClusterService clusterService; - private Client client; /** * Instantiate this class. * * @param workflowStepFactory The factory which matches template step types to instances. * @param threadPool The OpenSearch Thread pool to pass to process nodes. - * @param clusterService The OpenSearch cluster service. - * @param client The OpenSearch Client * @param flowFrameworkSettings settings of the plugin */ public WorkflowProcessSorter( WorkflowStepFactory workflowStepFactory, ThreadPool threadPool, - ClusterService clusterService, - Client client, FlowFrameworkSettings flowFrameworkSettings ) { this.workflowStepFactory = workflowStepFactory; this.threadPool = threadPool; this.maxWorkflowSteps = flowFrameworkSettings.getMaxWorkflowSteps(); - this.clusterService = clusterService; - this.client = client; } /** @@ -110,6 +107,15 @@ public List sortProcessNodes(Workflow workflow, String workflowId, RestStatus.BAD_REQUEST ); } + // Disallow some steps + for (WorkflowNode node : workflow.nodes()) { + if (WORKFLOW_STEP_DENYLIST.contains(node.type())) { + throw new FlowFrameworkException( + "The step type [" + node.type() + "] for node [" + node.id() + "] can not be used in a workflow.", + RestStatus.FORBIDDEN + ); + } + } List sortedNodes = topologicalSort(workflow.nodes(), workflow.edges()); List nodes = new ArrayList<>(); diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java index ebc8be094..0456e5bbf 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java @@ -39,4 +39,12 @@ PlainActionFuture execute( * @return the name of this workflow step. */ String getName(); + + /** + * For steps which delete data, override this method to require the resource ID to be specified on the rest path to deprovision it + * @return true if the resource ID must be specified for deprovisioning + */ + default boolean allowDeleteRequired() { + return false; + } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index a5bebce83..9fc8baada 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -22,13 +22,15 @@ import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.threadpool.ThreadPool; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.opensearch.flowframework.common.CommonValue.ACTIONS_FIELD; import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; @@ -86,6 +88,7 @@ public WorkflowStepFactory( ) { stepMap.put(NoOpStep.NAME, NoOpStep::new); stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler)); + stepMap.put(DeleteIndexStep.NAME, () -> new DeleteIndexStep(client)); stepMap.put(ReindexStep.NAME, () -> new ReindexStep(client, flowFrameworkIndicesHandler)); stepMap.put( RegisterLocalCustomModelStep.NAME, @@ -113,25 +116,35 @@ public WorkflowStepFactory( stepMap.put(RegisterAgentStep.NAME, () -> new RegisterAgentStep(mlClient, flowFrameworkIndicesHandler)); stepMap.put(DeleteAgentStep.NAME, () -> new DeleteAgentStep(mlClient)); stepMap.put(CreateIngestPipelineStep.NAME, () -> new CreateIngestPipelineStep(client, flowFrameworkIndicesHandler)); + stepMap.put(DeleteIngestPipelineStep.NAME, () -> new DeleteIngestPipelineStep(client)); stepMap.put(CreateSearchPipelineStep.NAME, () -> new CreateSearchPipelineStep(client, flowFrameworkIndicesHandler)); stepMap.put(UpdateIngestPipelineStep.NAME, () -> new UpdateIngestPipelineStep(client)); stepMap.put(UpdateSearchPipelineStep.NAME, () -> new UpdateSearchPipelineStep(client)); stepMap.put(UpdateIndexStep.NAME, () -> new UpdateIndexStep(client)); + stepMap.put(DeleteSearchPipelineStep.NAME, () -> new DeleteSearchPipelineStep(client)); } /** * Enum encapsulating the different step names, their inputs, outputs, required plugin and timeout of the step */ - public enum WorkflowSteps { /** Noop Step */ - NOOP("noop", Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), null), + NOOP(NoOpStep.NAME, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), null), /** Create Index Step */ CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null), + /** Delete Index Step */ + DELETE_INDEX( + DeleteIndexStep.NAME, + DeleteIndexStep.REQUIRED_INPUTS, // TODO: Copy this pattern to other steps, see + DeleteIndexStep.PROVIDED_OUTPUTS, // https://github.com/opensearch-project/flow-framework/issues/535 + Collections.emptyList(), + null + ), + /** Create ReIndex Step */ REINDEX(ReindexStep.NAME, List.of(SOURCE_INDEX, DESTINATION_INDEX), List.of(ReindexStep.NAME), Collections.emptyList(), null), @@ -229,6 +242,15 @@ public enum WorkflowSteps { null ), + /** Delete Ingest Pipeline Step */ + DELETE_INGEST_PIPELINE( + DeleteIngestPipelineStep.NAME, + DeleteIngestPipelineStep.REQUIRED_INPUTS, + DeleteIngestPipelineStep.PROVIDED_OUTPUTS, + Collections.emptyList(), + null + ), + /** Create Search Pipeline Step */ CREATE_SEARCH_PIPELINE( CreateSearchPipelineStep.NAME, @@ -257,7 +279,16 @@ public enum WorkflowSteps { ), /** Update Index Step */ - UPDATE_INDEX(UpdateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null); + UPDATE_INDEX(UpdateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null), + + /** Delete Search Pipeline Step */ + DELETE_SEARCH_PIPELINE( + DeleteSearchPipelineStep.NAME, + DeleteSearchPipelineStep.REQUIRED_INPUTS, + DeleteSearchPipelineStep.PROVIDED_OUTPUTS, + Collections.emptyList(), + null + ); private final String workflowStepName; private final List inputs; @@ -265,7 +296,13 @@ public enum WorkflowSteps { private final List requiredPlugins; private final TimeValue timeout; - WorkflowSteps(String workflowStepName, List inputs, List outputs, List requiredPlugins, TimeValue timeout) { + WorkflowSteps( + String workflowStepName, + Collection inputs, + Collection outputs, + List requiredPlugins, + TimeValue timeout + ) { this.workflowStepName = workflowStepName; this.inputs = List.copyOf(inputs); this.outputs = List.copyOf(outputs); @@ -404,13 +441,11 @@ public static List getInputByWorkflowType(String workflowStep) throws Fl * @return WorkflowValidator */ public WorkflowValidator getWorkflowValidator() { - Map workflowStepValidators = new HashMap<>(); - - for (WorkflowSteps mapping : WorkflowSteps.values()) { - workflowStepValidators.put(mapping.getWorkflowStepName(), mapping.getWorkflowStepValidator()); - } - - return new WorkflowValidator(workflowStepValidators); + return new WorkflowValidator( + Stream.of(WorkflowSteps.values()) + .filter(w -> !WorkflowProcessSorter.WORKFLOW_STEP_DENYLIST.contains(w.getWorkflowStepName())) + .collect(Collectors.toMap(WorkflowSteps::getWorkflowStepName, WorkflowSteps::getWorkflowStepValidator)) + ); } /** @@ -419,22 +454,20 @@ public WorkflowValidator getWorkflowValidator() { * @return WorkflowValidator */ public WorkflowValidator getWorkflowValidatorByStep(List steps) { - Map workflowStepValidators = new HashMap<>(); - Set invalidSteps = new HashSet<>(steps); - - for (WorkflowSteps mapping : WorkflowSteps.values()) { - String step = mapping.getWorkflowStepName(); - if (steps.contains(step)) { - workflowStepValidators.put(mapping.getWorkflowStepName(), mapping.getWorkflowStepValidator()); - invalidSteps.remove(step); - } - } - + Set validSteps = Stream.of(WorkflowSteps.values()) + .map(WorkflowSteps::getWorkflowStepName) + .filter(name -> !WorkflowProcessSorter.WORKFLOW_STEP_DENYLIST.contains(name)) + .filter(steps::contains) + .collect(Collectors.toSet()); + Set invalidSteps = steps.stream().filter(name -> !validSteps.contains(name)).collect(Collectors.toSet()); if (!invalidSteps.isEmpty()) { throw new FlowFrameworkException("Invalid step name: " + invalidSteps, RestStatus.BAD_REQUEST); } - - return new WorkflowValidator(workflowStepValidators); + return new WorkflowValidator( + Stream.of(WorkflowSteps.values()) + .filter(w -> validSteps.contains(w.getWorkflowStepName())) + .collect(Collectors.toMap(WorkflowSteps::getWorkflowStepName, WorkflowSteps::getWorkflowStepValidator)) + ); } /** diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java index 401ddbe9a..86224ca26 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java @@ -16,11 +16,13 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.env.Environment; +import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Collection; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -86,6 +88,9 @@ public void testPlugin() throws IOException { assertEquals(9, ffp.getActions().size()); assertEquals(3, ffp.getExecutorBuilders(settings).size()); assertEquals(5, ffp.getSettings().size()); + + Collection systemIndexDescriptors = ffp.getSystemIndexDescriptors(Settings.EMPTY); + assertEquals(3, systemIndexDescriptors.size()); } } } diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index 922c26b0f..dc25f44fa 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -434,6 +434,25 @@ protected Response updateWorkflow(RestClient client, String workflowId, Template ); } + /** + * Helper method to invoke the Update Workflow API + * @param client the rest client + * @param workflowId the document id + * @param templateFields the JSON containing some template fields + * @throws Exception if the request fails + * @return a rest response + */ + protected Response updateWorkflowWithFields(RestClient client, String workflowId, String templateFields) throws Exception { + return TestHelpers.makeRequest( + client, + "PUT", + String.format(Locale.ROOT, "%s/%s?update_fields=true", WORKFLOW_URI, workflowId), + Collections.emptyMap(), + templateFields, + null + ); + } + /** * Helper method to invoke the Provision Workflow Rest Action * @param client the rest client @@ -470,6 +489,24 @@ protected Response deprovisionWorkflow(RestClient client, String workflowId) thr ); } + /** + * Helper method to invoke the Deprovision Workflow Rest Action + * @param client the rest client + * @param workflowId the workflow ID to deprovision + * @return a rest response + * @throws Exception if the request fails + */ + protected Response deprovisionWorkflowWithAllowDelete(RestClient client, String workflowId, String allowedResource) throws Exception { + return TestHelpers.makeRequest( + client, + "POST", + String.format(Locale.ROOT, "%s/%s/%s%s", WORKFLOW_URI, workflowId, "_deprovision?allow_delete=", allowedResource), + Collections.emptyMap(), + "", + null + ); + } + /** * Helper method to invoke the Delete Workflow Rest Action * @param client the rest client diff --git a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java index 63dbf31f6..d8550682b 100644 --- a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java +++ b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java @@ -9,7 +9,11 @@ package org.opensearch.flowframework.model; import org.opensearch.Version; +import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.test.OpenSearchTestCase; @@ -19,6 +23,8 @@ import java.util.List; import java.util.Map; +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; + public class TemplateTests extends OpenSearchTestCase { private String expectedTemplate = @@ -84,6 +90,59 @@ public void testTemplate() throws IOException { assertEquals(now, template.lastUpdatedTime()); assertNull(template.lastProvisionedTime()); assertEquals("Workflow [userParams={key=value}, nodes=[A, B], edges=[A->B]]", wfX.toString()); + + // Test invalid field if updating + XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.IGNORE_DEPRECATIONS, + json + ); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + assertThrows(FlowFrameworkException.class, () -> Template.parse(parser, true)); + } + + public void testUpdateExistingTemplate() { + // Time travel to guarantee update increments + Instant now = Instant.now().minusMillis(100); + + Template original = new Template( + "name one", + "description one", + "use case one", + Version.fromString("1.1.1"), + List.of(Version.fromString("1.1.1"), Version.fromString("1.1.1")), + Collections.emptyMap(), + Map.of("uiMetadata", "one"), + null, + now, + now, + null + ); + Template updated = Template.builder().name("name two").description("description two").useCase("use case two").build(); + Template merged = Template.updateExistingTemplate(original, updated); + assertEquals("name two", merged.name()); + assertEquals("description two", merged.description()); + assertEquals("use case two", merged.useCase()); + assertEquals("1.1.1", merged.templateVersion().toString()); + assertEquals("1.1.1", merged.compatibilityVersion().get(0).toString()); + assertEquals("1.1.1", merged.compatibilityVersion().get(1).toString()); + assertEquals("one", merged.getUiMetadata().get("uiMetadata")); + + updated = Template.builder() + .templateVersion(Version.fromString("2.2.2")) + .compatibilityVersion(List.of(Version.fromString("2.2.2"), Version.fromString("2.2.2"))) + .uiMetadata(Map.of("uiMetadata", "two")) + .build(); + merged = Template.updateExistingTemplate(original, updated); + assertEquals("name one", merged.name()); + assertEquals("description one", merged.description()); + assertEquals("use case one", merged.useCase()); + assertEquals("2.2.2", merged.templateVersion().toString()); + assertEquals("2.2.2", merged.compatibilityVersion().get(0).toString()); + assertEquals("2.2.2", merged.compatibilityVersion().get(1).toString()); + assertEquals("two", merged.getUiMetadata().get("uiMetadata")); + + assertTrue(merged.lastUpdatedTime().isAfter(now)); } public void testExceptions() throws IOException { @@ -106,4 +165,23 @@ public void testStrings() throws IOException { assertTrue(t.toYaml().contains("a test template")); assertTrue(t.toString().contains("a test template")); } + + public void testNullToEmptyString() throws IOException { + Template t = Template.parse("{\"name\":\"test\"}"); + assertEquals("test", t.name()); + assertEquals("", t.description()); + assertEquals("", t.useCase()); + + XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.IGNORE_DEPRECATIONS, + "{\"name\":\"test\"}" + ); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + t = Template.parse(parser, true); + String json = t.toJson(); + assertTrue(json.contains("\"name\":\"test\"")); + assertTrue(json.contains("\"description\":\"\"")); + assertTrue(json.contains("\"use_case\":\"\"")); + } } diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java index 04c3655d3..ca7873036 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowStateTests.java @@ -90,4 +90,77 @@ public void testWorkflowState() throws IOException { } } + public void testWorkflowStateUpdate() { + // Time travel to guarantee update increments + Instant now = Instant.now().minusMillis(100); + + WorkflowState wfs = WorkflowState.builder() + .workflowId("1") + .error("error one") + .state("state one") + .provisioningProgress("progress one") + .provisionStartTime(now) + .provisionEndTime(now) + .user(new User("one", Collections.emptyList(), Collections.emptyList(), Collections.emptyList())) + .userOutputs(Map.of("output", "one")) + .resourcesCreated(List.of(new ResourceCreated("", "", "", "id one"))) + .build(); + + assertEquals("1", wfs.getWorkflowId()); + assertEquals("error one", wfs.getError()); + assertEquals("state one", wfs.getState()); + assertEquals("progress one", wfs.getProvisioningProgress()); + assertEquals(now, wfs.getProvisionStartTime()); + assertEquals(now, wfs.getProvisionEndTime()); + assertEquals("one", wfs.getUser().getName()); + assertEquals(1, wfs.userOutputs().size()); + assertEquals("one", wfs.userOutputs().get("output")); + assertEquals(1, wfs.resourcesCreated().size()); + ResourceCreated rc = wfs.resourcesCreated().get(0); + assertEquals("id one", rc.resourceId()); + + WorkflowState update = WorkflowState.builder() + .workflowId("2") + .error("error two") + .state("state two") + .provisioningProgress("progress two") + .user(new User("two", Collections.emptyList(), Collections.emptyList(), Collections.emptyList())) + .build(); + + wfs = WorkflowState.updateExistingWorkflowState(wfs, update); + assertEquals("2", wfs.getWorkflowId()); + assertEquals("error two", wfs.getError()); + assertEquals("state two", wfs.getState()); + assertEquals("progress two", wfs.getProvisioningProgress()); + assertEquals(now, wfs.getProvisionStartTime()); + assertEquals(now, wfs.getProvisionEndTime()); + assertEquals("two", wfs.getUser().getName()); + assertEquals(1, wfs.userOutputs().size()); + assertEquals("one", wfs.userOutputs().get("output")); + assertEquals(1, wfs.resourcesCreated().size()); + rc = wfs.resourcesCreated().get(0); + assertEquals("id one", rc.resourceId()); + + now = Instant.now().minusMillis(100); + update = WorkflowState.builder() + .provisionStartTime(now) + .provisionEndTime(now) + .userOutputs(Map.of("output", "two")) + .resourcesCreated(List.of(wfs.resourcesCreated().get(0), new ResourceCreated("", "", "", "id two"))) + .build(); + + wfs = WorkflowState.updateExistingWorkflowState(wfs, update); + assertEquals("2", wfs.getWorkflowId()); + assertEquals("error two", wfs.getError()); + assertEquals("state two", wfs.getState()); + assertEquals("progress two", wfs.getProvisioningProgress()); + assertEquals(now, wfs.getProvisionStartTime()); + assertEquals(now, wfs.getProvisionEndTime()); + assertEquals("two", wfs.getUser().getName()); + assertEquals(1, wfs.userOutputs().size()); + assertEquals("two", wfs.userOutputs().get("output")); + assertEquals(2, wfs.resourcesCreated().size()); + rc = wfs.resourcesCreated().get(1); + assertEquals("id two", rc.resourceId()); + } } diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java index 19cb3d718..e685e07b9 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java @@ -11,6 +11,7 @@ import org.opensearch.client.Client; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.flowframework.workflow.WorkflowProcessSorter; import org.opensearch.flowframework.workflow.WorkflowStepFactory; import org.opensearch.flowframework.workflow.WorkflowStepFactory.WorkflowSteps; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -46,63 +47,7 @@ public void testParseWorkflowValidator() throws IOException { WorkflowValidator validator = new WorkflowValidator(workflowStepValidators); - assertEquals(18, validator.getWorkflowStepValidators().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector")); - assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("create_connector").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("delete_model")); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_model").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("deploy_model")); - assertEquals(1, validator.getWorkflowStepValidators().get("deploy_model").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("deploy_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_remote_model")); - assertEquals(2, validator.getWorkflowStepValidators().get("register_remote_model").getInputs().size()); - assertEquals(2, validator.getWorkflowStepValidators().get("register_remote_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_model_group")); - assertEquals(1, validator.getWorkflowStepValidators().get("register_model_group").getInputs().size()); - assertEquals(2, validator.getWorkflowStepValidators().get("register_model_group").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_local_custom_model")); - assertEquals(9, validator.getWorkflowStepValidators().get("register_local_custom_model").getInputs().size()); - assertEquals(2, validator.getWorkflowStepValidators().get("register_local_custom_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_local_sparse_encoding_model")); - assertEquals(3, validator.getWorkflowStepValidators().get("register_local_sparse_encoding_model").getInputs().size()); - assertEquals(5, validator.getWorkflowStepValidators().get("register_local_sparse_encoding_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_local_pretrained_model")); - assertEquals(3, validator.getWorkflowStepValidators().get("register_local_pretrained_model").getInputs().size()); - assertEquals(2, validator.getWorkflowStepValidators().get("register_local_pretrained_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("undeploy_model")); - assertEquals(1, validator.getWorkflowStepValidators().get("undeploy_model").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("undeploy_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("delete_connector")); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_connector").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_connector").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_agent")); - assertEquals(2, validator.getWorkflowStepValidators().get("register_agent").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("register_agent").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("delete_agent")); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_agent").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_agent").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_tool")); - assertEquals(1, validator.getWorkflowStepValidators().get("create_tool").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("create_tool").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("noop")); - assertEquals(0, validator.getWorkflowStepValidators().get("noop").getInputs().size()); - assertEquals(0, validator.getWorkflowStepValidators().get("noop").getOutputs().size()); + assertEquals(21, validator.getWorkflowStepValidators().size()); } public void testWorkflowStepFactoryHasValidators() throws IOException { @@ -126,10 +71,16 @@ public void testWorkflowStepFactoryHasValidators() throws IOException { // Get all registered workflow step types in the workflow step factory List registeredWorkflowStepTypes = new ArrayList(workflowStepFactory.getStepMap().keySet()); + registeredWorkflowStepTypes.removeAll(WorkflowProcessSorter.WORKFLOW_STEP_DENYLIST); // Check if each registered step has a corresponding validator definition assertTrue(registeredWorkflowStepTypes.containsAll(registeredWorkflowValidatorTypes)); assertTrue(registeredWorkflowValidatorTypes.containsAll(registeredWorkflowStepTypes)); - } + // Check JSON + String json = workflowValidator.toJson(); + for (String step : registeredWorkflowStepTypes) { + assertTrue(json.contains("\"" + step + "\"")); + } + } } diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 6dfad72f1..6a454dc75 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -9,8 +9,6 @@ package org.opensearch.flowframework.rest; import org.apache.hc.core5.http.io.entity.EntityUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.action.ingest.GetPipelineResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Response; @@ -50,8 +48,6 @@ import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; public class FlowFrameworkRestApiIT extends FlowFrameworkRestTestCase { - private static final Logger logger = LogManager.getLogger(FlowFrameworkRestApiIT.class); - private static AtomicBoolean waitToStart = new AtomicBoolean(true); @Before @@ -116,7 +112,45 @@ public void testFailedUpdateWorkflow() throws Exception { assertTrue( exceptionProvisioned.getMessage().contains("The template can not be updated unless its provisioning state is NOT_STARTED") ); + } + + public void testUpdateWorkflowUsingFields() throws Exception { + Template template = TestHelpers.createTemplateFromFile("createconnector-registerremotemodel-deploymodel.json"); + Response response = createWorkflow(client(), template); + assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + + // Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status + Response provisionResponse; + if (!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 40, TimeUnit.SECONDS); + provisionResponse = provisionWorkflow(client(), workflowId); + } else { + provisionResponse = provisionWorkflow(client(), workflowId); + } + assertEquals(RestStatus.OK, TestHelpers.restStatus(provisionResponse)); + getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); + + // Attempt to update with update_fields with illegal field + // Fails because contains workflow field + ResponseException exceptionProvisioned = expectThrows( + ResponseException.class, + () -> updateWorkflowWithFields(client(), workflowId, "{\"workflows\":{}}") + ); + assertTrue( + exceptionProvisioned.getMessage().contains("You can not update the field [workflows] without updating the whole template.") + ); + // Change just the name and description + response = updateWorkflowWithFields(client(), workflowId, "{\"name\":\"foo\",\"description\":\"bar\"}"); + assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); + // Get the updated template + response = getWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + Template t = Template.parse(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + assertEquals("foo", t.name()); + assertEquals("bar", t.description()); } public void testCreateAndProvisionLocalModelWorkflow() throws Exception { @@ -139,7 +173,7 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception { ) .collect(Collectors.toList()); Workflow missingInputs = new Workflow(originalWorkflow.userParams(), modifiednodes, originalWorkflow.edges()); - Template templateWithMissingInputs = new Template.Builder(template).workflows(Map.of(PROVISION_WORKFLOW, missingInputs)).build(); + Template templateWithMissingInputs = Template.builder(template).workflows(Map.of(PROVISION_WORKFLOW, missingInputs)).build(); // Hit Create Workflow API with invalid template Response response = createWorkflow(client(), templateWithMissingInputs); @@ -202,7 +236,7 @@ public void testCreateAndProvisionCyclicalTemplate() throws Exception { List.of(new WorkflowEdge("workflow_step_2", "workflow_step_3"), new WorkflowEdge("workflow_step_3", "workflow_step_2")) ); - Template cyclicalTemplate = new Template.Builder(template).workflows(Map.of(PROVISION_WORKFLOW, cyclicalWorkflow)).build(); + Template cyclicalTemplate = Template.builder(template).workflows(Map.of(PROVISION_WORKFLOW, cyclicalWorkflow)).build(); // Hit dry run ResponseException exception = expectThrows(ResponseException.class, () -> createWorkflowValidation(client(), cyclicalTemplate)); @@ -409,7 +443,7 @@ public void testCreateAndProvisionIngestAndSearchPipeline() throws Exception { "create_ingest_pipeline" ); - List workflowStepNames = resourcesCreated.stream() + List workflowStepNames = resourcesCreated.stream() .peek(resourceCreated -> assertNotNull(resourceCreated.resourceId())) .map(ResourceCreated::workflowStepName) .collect(Collectors.toList()); @@ -457,7 +491,7 @@ public void testDefaultCohereUseCase() throws Exception { List expectedStepNames = List.of("create_connector", "register_remote_model", "deploy_model"); - List workflowStepNames = resourcesCreated.stream() + List workflowStepNames = resourcesCreated.stream() .peek(resourceCreated -> assertNotNull(resourceCreated.resourceId())) .map(ResourceCreated::workflowStepName) .collect(Collectors.toList()); @@ -527,7 +561,7 @@ public void testAllDefaultUseCasesCreation() throws Exception { public void testSemanticSearchWithLocalModelEndToEnd() throws Exception { // Checking if plugins are part of the integration test cluster so we can continue with this test List plugins = catPlugins(); - if (!plugins.contains("opensearch-knn") && plugins.contains("opensearch-neural-search")) { + if (!plugins.contains("opensearch-knn") && !plugins.contains("opensearch-neural-search")) { return; } Map defaults = new HashMap<>(); @@ -548,6 +582,7 @@ public void testSemanticSearchWithLocalModelEndToEnd() throws Exception { // This template should create 4 resources, registered model_id, deployed model_id, ingest pipeline, and index name assertEquals(4, resourcesCreated.size()); String modelId = resourcesCreated.get(1).resourceId(); + String pipelineId = resourcesCreated.get(2).resourceId(); String indexName = resourcesCreated.get(3).resourceId(); // Short wait before ingesting data @@ -556,36 +591,49 @@ public void testSemanticSearchWithLocalModelEndToEnd() throws Exception { String docContent = "{\"passage_text\": \"Hello planet\"\n}"; ingestSingleDoc(docContent, indexName); // Short wait before neural search - Thread.sleep(500); + Thread.sleep(3000); SearchResponse neuralSearchResponse = neuralSearchRequest(indexName, modelId); - assertEquals(neuralSearchResponse.getHits().getHits().length, 1); + assertNotNull(neuralSearchResponse); Thread.sleep(500); - deleteIndex(indexName); - // Hit Deprovision API - // By design, this may not completely deprovision the first time if it takes >2s to process removals - Response deprovisionResponse = deprovisionWorkflow(client(), workflowId); + // Hit Deprovision API using allow_delete but only for the pipeline + Response deprovisionResponse = null; try { + // By design, this may not completely deprovision the first time if it takes >2s to process removals + deprovisionResponse = deprovisionWorkflowWithAllowDelete(client(), workflowId, pipelineId); assertBusy( () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, 30, TimeUnit.SECONDS ); + } catch (ResponseException re) { + // 403 return if completed with only index remaining to delete + assertEquals(RestStatus.FORBIDDEN, TestHelpers.restStatus(re.getResponse())); } catch (ComparisonFailure e) { // 202 return if still processing + assertNotNull(deprovisionResponse); assertEquals(RestStatus.ACCEPTED, TestHelpers.restStatus(deprovisionResponse)); } - if (TestHelpers.restStatus(deprovisionResponse) == RestStatus.ACCEPTED) { + if (deprovisionResponse != null && TestHelpers.restStatus(deprovisionResponse) == RestStatus.ACCEPTED) { // Short wait before we try again Thread.sleep(10000); - deprovisionResponse = deprovisionWorkflow(client(), workflowId); - assertBusy( - () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, - 30, - TimeUnit.SECONDS - ); + // Expected failure since we haven't provided allow_delete param + try { + deprovisionResponse = deprovisionWorkflowWithAllowDelete(client(), workflowId, pipelineId); + } catch (ResponseException re) { + // Expected 403 return with only index remaining to delete + assertEquals(RestStatus.FORBIDDEN, TestHelpers.restStatus(re.getResponse())); + } } + // Now try again with allow_delete for the index + deprovisionResponse = deprovisionWorkflowWithAllowDelete(client(), workflowId, pipelineId + "," + indexName); + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, + 30, + TimeUnit.SECONDS + ); assertEquals(RestStatus.OK, TestHelpers.restStatus(deprovisionResponse)); + // Hit Delete API Response deleteResponse = deleteWorkflow(client(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java index b55d6b1f2..7e537566c 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java @@ -36,6 +36,7 @@ import static org.opensearch.flowframework.common.CommonValue.CREATE_CONNECTOR_CREDENTIAL_KEY; import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW; +import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS; import static org.opensearch.flowframework.common.CommonValue.USE_CASE; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; import static org.mockito.ArgumentMatchers.any; @@ -47,6 +48,7 @@ public class RestCreateWorkflowActionTests extends OpenSearchTestCase { private String validTemplate; private String invalidTemplate; + private String validUpdateTemplate; private RestCreateWorkflowAction createWorkflowRestAction; private String createWorkflowPath; private String updateWorkflowPath; @@ -82,9 +84,11 @@ public void setUp() throws Exception { null ); - // Invalid template configuration, wrong field name this.validTemplate = template.toJson(); + // Invalid template configuration, wrong field name this.invalidTemplate = this.validTemplate.replace("use_case", "invalid"); + // Partial update of some fields + this.validUpdateTemplate = "{\"description\":\"new description\",\"ui_metadata\":{\"foo\":\"bar\"}}"; this.createWorkflowRestAction = new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting); this.createWorkflowPath = String.format(Locale.ROOT, "%s", WORKFLOW_URI); this.updateWorkflowPath = String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, "workflow_id"); @@ -137,6 +141,78 @@ public void testCreateWorkflowRequestWithParamsButNoProvision() throws Exception ); } + public void testCreateWorkflowRequestWithUpdateAndProvision() throws Exception { + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath(this.createWorkflowPath) + .withParams(Map.ofEntries(Map.entry(PROVISION_WORKFLOW, "true"), Map.entry(UPDATE_WORKFLOW_FIELDS, "true"))) + .withContent(new BytesArray(validTemplate), MediaTypeRegistry.JSON) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + createWorkflowRestAction.handleRequest(request, channel, nodeClient); + assertEquals(RestStatus.BAD_REQUEST, channel.capturedResponse().status()); + assertTrue( + channel.capturedResponse() + .content() + .utf8ToString() + .contains( + "You can not use both the " + PROVISION_WORKFLOW + " and " + UPDATE_WORKFLOW_FIELDS + " parameters in the same request." + ) + ); + } + + public void testCreateWorkflowRequestWithUpdateAndParams() throws Exception { + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath(this.createWorkflowPath) + .withParams(Map.ofEntries(Map.entry(UPDATE_WORKFLOW_FIELDS, "true"), Map.entry("foo", "bar"))) + .withContent(new BytesArray(validTemplate), MediaTypeRegistry.JSON) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + createWorkflowRestAction.handleRequest(request, channel, nodeClient); + assertEquals(RestStatus.BAD_REQUEST, channel.capturedResponse().status()); + assertTrue( + channel.capturedResponse().content().utf8ToString().contains("are permitted unless the provision parameter is set to true.") + ); + } + + public void testUpdateWorkflowRequestWithFullTemplateUpdateAndNoParams() throws Exception { + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath(this.updateWorkflowPath) + .withParams(Map.of(UPDATE_WORKFLOW_FIELDS, "true")) + .withContent(new BytesArray(validTemplate), MediaTypeRegistry.JSON) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(2); + actionListener.onResponse(new WorkflowResponse("id-123")); + return null; + }).when(nodeClient).execute(any(), any(WorkflowRequest.class), any()); + createWorkflowRestAction.handleRequest(request, channel, nodeClient); + assertEquals(RestStatus.BAD_REQUEST, channel.capturedResponse().status()); + assertTrue( + channel.capturedResponse() + .content() + .utf8ToString() + .contains("You can not update the field [workflows] without updating the whole template.") + ); + } + + public void testUpdateWorkflowRequestWithUpdateTemplateUpdateAndNoParams() throws Exception { + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath(this.updateWorkflowPath) + .withParams(Map.of(UPDATE_WORKFLOW_FIELDS, "true")) + .withContent(new BytesArray(validUpdateTemplate), MediaTypeRegistry.JSON) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(2); + actionListener.onResponse(new WorkflowResponse("id-123")); + return null; + }).when(nodeClient).execute(any(), any(WorkflowRequest.class), any()); + createWorkflowRestAction.handleRequest(request, channel, nodeClient); + assertEquals(RestStatus.CREATED, channel.capturedResponse().status()); + assertTrue(channel.capturedResponse().content().utf8ToString().contains("id-123")); + } + public void testCreateWorkflowRequestWithUseCaseButNoProvision() throws Exception { RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) .withPath(this.createWorkflowPath) diff --git a/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java index e9a2c5a47..0d24b033a 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java @@ -9,8 +9,12 @@ package org.opensearch.flowframework.rest; import org.opensearch.client.node.NodeClient; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.common.FlowFrameworkSettings; +import org.opensearch.flowframework.transport.DeprovisionWorkflowAction; +import org.opensearch.flowframework.transport.WorkflowRequest; +import org.opensearch.flowframework.transport.WorkflowResponse; import org.opensearch.rest.RestHandler.Route; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; @@ -19,9 +23,15 @@ import java.util.List; import java.util.Locale; +import java.util.Map; +import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; public class RestDeprovisionWorkflowActionTests extends OpenSearchTestCase { @@ -37,7 +47,7 @@ public void setUp() throws Exception { flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkSettings.class); when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); - this.deprovisionWorkflowRestAction = new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting); + this.deprovisionWorkflowRestAction = spy(new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting)); this.deprovisionWorkflowPath = String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, "workflow_id", "_deprovision"); this.nodeClient = mock(NodeClient.class); } @@ -55,7 +65,6 @@ public void testRestDeprovisiionWorkflowActionRoutes() { } public void testNullWorkflowId() throws Exception { - // Request with no params RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) .withPath(this.deprovisionWorkflowPath) @@ -69,6 +78,25 @@ public void testNullWorkflowId() throws Exception { assertTrue(channel.capturedResponse().content().utf8ToString().contains("workflow_id cannot be null")); } + public void testAllowDeleteParam() throws Exception { + String allowDeleteParam = "foo,bar"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath(this.deprovisionWorkflowPath) + .withParams(Map.ofEntries(Map.entry(WORKFLOW_ID, "workflow_id"), Map.entry(ALLOW_DELETE, allowDeleteParam))) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, true, 1); + doAnswer(invocation -> { + WorkflowRequest workflowRequest = invocation.getArgument(1); + ActionListener responseListener = invocation.getArgument(2); + responseListener.onResponse(new WorkflowResponse(workflowRequest.getParams().get(ALLOW_DELETE))); + return null; + }).when(nodeClient).execute(any(DeprovisionWorkflowAction.class), any(WorkflowRequest.class), any()); + + deprovisionWorkflowRestAction.handleRequest(request, channel, nodeClient); + assertEquals(RestStatus.OK, channel.capturedResponse().status()); + assertTrue(channel.capturedResponse().content().utf8ToString().contains(allowDeleteParam)); + } + public void testFeatureFlagNotEnabled() throws Exception { when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(false); RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) @@ -76,6 +104,7 @@ public void testFeatureFlagNotEnabled() throws Exception { .build(); FakeRestChannel channel = new FakeRestChannel(request, false, 1); deprovisionWorkflowRestAction.handleRequest(request, channel, nodeClient); + assertEquals(1, channel.errors().get()); assertEquals(RestStatus.FORBIDDEN, channel.capturedResponse().status()); assertTrue(channel.capturedResponse().content().utf8ToString().contains("This API is disabled.")); } diff --git a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java index 11b620f3d..265d1d52d 100644 --- a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java @@ -48,6 +48,7 @@ import static org.opensearch.action.DocWriteResponse.Result.UPDATED; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; +import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID; import static org.opensearch.flowframework.common.WorkflowResources.CREATE_CONNECTOR; @@ -55,6 +56,7 @@ import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; import static org.opensearch.flowframework.common.WorkflowResources.REGISTER_REMOTE_MODEL; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -352,7 +354,7 @@ public void testFailedToUpdateWorkflow() { ActionListener responseListener = invocation.getArgument(2); responseListener.onFailure(new Exception("failed")); return null; - }).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(any(), any(Template.class), any()); + }).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(anyString(), any(Template.class), any(), anyBoolean()); createWorkflowTransportAction.doExecute(mock(Task.class), updateWorkflow, listener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); @@ -394,7 +396,7 @@ public void testUpdateWorkflow() { ActionListener getListener = invocation.getArgument(1); GetResponse getResponse = mock(GetResponse.class); when(getResponse.isExists()).thenReturn(true); - when(getResponse.getSourceAsString()).thenReturn(new Template.Builder().name("test").build().toJson()); + when(getResponse.getSourceAsString()).thenReturn(Template.builder().name("test").build().toJson()); getListener.onResponse(getResponse); return null; }).when(client).get(any(GetRequest.class), any()); @@ -403,7 +405,7 @@ public void testUpdateWorkflow() { ActionListener responseListener = invocation.getArgument(2); responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true)); return null; - }).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(any(), any(Template.class), any()); + }).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(anyString(), any(Template.class), any(), anyBoolean()); doAnswer(invocation -> { ActionListener updateResponseListener = invocation.getArgument(2); @@ -418,6 +420,82 @@ public void testUpdateWorkflow() { assertEquals("1", responseCaptor.getValue().getWorkflowId()); } + public void testUpdateWorkflowWithField() { + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + WorkflowRequest updateWorkflow = new WorkflowRequest( + "1", + Template.builder().name("new name").description("test").useCase(null).uiMetadata(Map.of("foo", "bar")).build(), + Map.of(UPDATE_WORKFLOW_FIELDS, "true") + ); + + doAnswer(invocation -> { + ActionListener getListener = invocation.getArgument(1); + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(true); + when(getResponse.getSourceAsString()).thenReturn(template.toJson()); + getListener.onResponse(getResponse); + return null; + }).when(client).get(any(GetRequest.class), any()); + + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(2); + responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true)); + return null; + }).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(anyString(), any(Template.class), any(), anyBoolean()); + + createWorkflowTransportAction.doExecute(mock(Task.class), updateWorkflow, listener); + verify(listener, times(1)).onResponse(any()); + + ArgumentCaptor