diff --git a/CHANGELOG.md b/CHANGELOG.md index bed14e8f3..f7ff09f3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,9 +19,12 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.17...2.x) ### Features +- Add ApiSpecFetcher for Fetching and Comparing API Specifications ([#651](https://github.com/opensearch-project/flow-framework/issues/651)) - Add optional config field to tool step ([#899](https://github.com/opensearch-project/flow-framework/pull/899)) ### Enhancements +- Incrementally remove resources from workflow state during deprovisioning ([#898](https://github.com/opensearch-project/flow-framework/pull/898)) + ### Bug Fixes ### Infrastructure ### Documentation diff --git a/build.gradle b/build.gradle index 56d6814e1..36bef104e 100644 --- a/build.gradle +++ b/build.gradle @@ -24,7 +24,7 @@ buildscript { opensearch_no_snapshot = opensearch_build.replace("-SNAPSHOT","") System.setProperty('tests.security.manager', 'false') common_utils_version = System.getProperty("common_utils.version", opensearch_build) - + swaggerCoreVersion = "2.2.23" bwcVersionShort = "2.12.0" bwcVersion = bwcVersionShort + ".0" bwcOpenSearchFFDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' + @@ -34,6 +34,10 @@ buildscript { bwcFlowFrameworkPath = bwcFilePath + "flowframework/" isSameMajorVersion = opensearch_version.split("\\.")[0] == bwcVersionShort.split("\\.")[0] + swaggerVersion = "2.1.22" + jacksonVersion = "2.18.0" + swaggerCoreVersion = "2.2.25" + } repositories { @@ -164,21 +168,37 @@ configurations { dependencies { implementation "org.opensearch:opensearch:${opensearch_version}" - implementation 'org.junit.jupiter:junit-jupiter:5.11.1' + implementation 'org.junit.jupiter:junit-jupiter:5.11.2' api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" api group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}" implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.17.0' implementation "org.opensearch:common-utils:${common_utils_version}" implementation "com.amazonaws:aws-encryption-sdk-java:3.0.1" implementation "software.amazon.cryptography:aws-cryptographic-material-providers:1.7.0" - implementation "org.dafny:DafnyRuntime:4.8.0" - implementation "software.amazon.smithy.dafny:conversion:0.1" + implementation "org.dafny:DafnyRuntime:4.8.1" + implementation "software.amazon.smithy.dafny:conversion:0.1.1" implementation 'org.bouncycastle:bcprov-jdk18on:1.78.1' api "org.apache.httpcomponents.core5:httpcore5:5.3" implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1" implementation "org.glassfish:jakarta.json:2.0.1" implementation "org.eclipse:yasson:3.0.4" implementation "com.google.code.gson:gson:2.11.0" + // Swagger-Parser dependencies for API consistency tests + implementation "io.swagger.core.v3:swagger-models:${swaggerCoreVersion}" + implementation "io.swagger.core.v3:swagger-core:${swaggerCoreVersion}" + implementation "io.swagger.parser.v3:swagger-parser-core:${swaggerVersion}" + implementation "io.swagger.parser.v3:swagger-parser:${swaggerVersion}" + implementation "io.swagger.parser.v3:swagger-parser-v3:${swaggerVersion}" + // Declare and force Jackson dependencies for tests + testImplementation("com.fasterxml.jackson.core:jackson-databind") { + version { strictly("${jacksonVersion}") } + } + testImplementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310") { + version { strictly("${jacksonVersion}") } + } + testImplementation("com.fasterxml.jackson.core:jackson-annotations") { + version { strictly("${jacksonVersion}") } + } // ZipArchive dependencies used for integration tests zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}" @@ -189,7 +209,7 @@ dependencies { configurations.all { resolutionStrategy { force("com.google.guava:guava:33.3.1-jre") // CVE for 31.1, keep to force transitive dependencies - force("com.fasterxml.jackson.core:jackson-core:2.18.0") // Dependency Jar Hell + force("com.fasterxml.jackson.core:jackson-core:${jacksonVersion}") // Dependency Jar Hell force("org.apache.httpcomponents.core5:httpcore5:5.3") // Dependency Jar Hell } } diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index c1992bdf4..9c88788b3 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -235,4 +235,9 @@ private CommonValue() {} public static final String CREATE_INGEST_PIPELINE_MODEL_ID = "create_ingest_pipeline.model_id"; /** The field name for reindex source index substitution */ public static final String REINDEX_SOURCE_INDEX = "reindex.source_index"; + + /**URI for the YAML file of the ML Commons API specification.*/ + public static final String ML_COMMONS_API_SPEC_YAML_URI = + "https://raw.githubusercontent.com/opensearch-project/opensearch-api-specification/refs/heads/main/spec/namespaces/ml.yaml"; + } diff --git a/src/main/java/org/opensearch/flowframework/exception/ApiSpecParseException.java b/src/main/java/org/opensearch/flowframework/exception/ApiSpecParseException.java new file mode 100644 index 000000000..ae77452c7 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/exception/ApiSpecParseException.java @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.exception; + +import org.opensearch.OpenSearchException; + +import java.util.List; + +/** + * Custom exception to be thrown when an error occurs during the parsing of an API specification. + */ +public class ApiSpecParseException extends OpenSearchException { + + /** + * Constructor with message. + * + * @param message The detail message. + */ + public ApiSpecParseException(String message) { + super(message); + } + + /** + * Constructor with message and cause. + * + * @param message The detail message. + * @param cause The cause of the exception. + */ + public ApiSpecParseException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructor with message and list of detailed errors. + * + * @param message The detail message. + * @param details The list of errors encountered during the parsing process. + */ + public ApiSpecParseException(String message, List details) { + super(message + ": " + String.join(", ", details)); + } +} diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index cb2dee56f..f05a162ff 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -10,7 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessageFactory; import org.opensearch.ExceptionsHelper; +import org.opensearch.action.DocWriteRequest.OpType; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -693,6 +695,7 @@ public void addResourceToStateIndex( getAndUpdateResourceInStateDocumentWithRetries( workflowId, newResource, + OpType.INDEX, RETRIES, ActionListener.runBefore(listener, context::restore) ); @@ -701,15 +704,41 @@ public void addResourceToStateIndex( } /** - * Performs a get and update of a State Index document adding a new resource with strong consistency and retries + * Removes a resource from the state index, including common exception handling + * @param workflowId The workflow document id in the state index + * @param resourceToDelete The resource to delete + * @param listener the ActionListener for this step to handle completing the future after update + */ + public void deleteResourceFromStateIndex(String workflowId, ResourceCreated resourceToDelete, ActionListener listener) { + if (!doesIndexExist(WORKFLOW_STATE_INDEX)) { + String errorMessage = "Failed to update state for " + workflowId + " due to missing " + WORKFLOW_STATE_INDEX + " index"; + logger.error(errorMessage); + listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND)); + } else { + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + getAndUpdateResourceInStateDocumentWithRetries( + workflowId, + resourceToDelete, + OpType.DELETE, + RETRIES, + ActionListener.runBefore(listener, context::restore) + ); + } + } + } + + /** + * Performs a get and update of a State Index document adding or removing a resource with strong consistency and retries * @param workflowId The document id to update - * @param newResource The resource to add to the resources created list + * @param resource The resource to add or remove from the resources created list + * @param operation The operation to perform on the resource (INDEX to append to the list or DELETE to remove) * @param retries The number of retries on update version conflicts * @param listener The listener to complete on success or failure */ private void getAndUpdateResourceInStateDocumentWithRetries( String workflowId, - ResourceCreated newResource, + ResourceCreated resource, + OpType operation, int retries, ActionListener listener ) { @@ -721,7 +750,11 @@ private void getAndUpdateResourceInStateDocumentWithRetries( } WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString()); List resourcesCreated = new ArrayList<>(currentState.resourcesCreated()); - resourcesCreated.add(newResource); + if (operation == OpType.DELETE) { + resourcesCreated.removeIf(r -> r.resourceMap().equals(resource.resourceMap())); + } else { + resourcesCreated.add(resource); + } XContentBuilder builder = XContentFactory.jsonBuilder(); WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build(); newState.toXContent(builder, null); @@ -732,41 +765,54 @@ private void getAndUpdateResourceInStateDocumentWithRetries( client.update( updateRequest, ActionListener.wrap( - r -> handleStateUpdateSuccess(workflowId, newResource, listener), - e -> handleStateUpdateException(workflowId, newResource, retries, listener, e) + r -> handleStateUpdateSuccess(workflowId, resource, operation, listener), + e -> handleStateUpdateException(workflowId, resource, operation, retries, listener, e) ) ); - }, ex -> handleStateUpdateException(workflowId, newResource, 0, listener, ex))); + }, ex -> handleStateUpdateException(workflowId, resource, operation, 0, listener, ex))); } - private void handleStateUpdateSuccess(String workflowId, ResourceCreated newResource, ActionListener listener) { + private void handleStateUpdateSuccess( + String workflowId, + ResourceCreated newResource, + OpType operation, + ActionListener listener + ) { String resourceName = newResource.resourceType(); String resourceId = newResource.resourceId(); String nodeId = newResource.workflowStepId(); - logger.info("Updated resources created for {} on step {} with {} {}", workflowId, nodeId, resourceName, resourceId); + logger.info( + "Updated resources created for {} on step {} to {} resource {} {}", + workflowId, + nodeId, + operation.equals(OpType.DELETE) ? "delete" : "add", + resourceName, + resourceId + ); listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), workflowId, nodeId)); } private void handleStateUpdateException( String workflowId, ResourceCreated newResource, + OpType operation, int retries, ActionListener listener, Exception e ) { if (e instanceof VersionConflictEngineException && retries > 0) { // Retry if we haven't exhausted retries - getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, retries - 1, listener); + getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, operation, retries - 1, listener); return; } - String errorMessage = "Failed to update workflow state for " - + workflowId - + " on step " - + newResource.workflowStepId() - + " with " - + newResource.resourceType() - + " " - + newResource.resourceId(); + String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( + "Failed to update workflow state for {} on step {} to {} resource {} {}", + workflowId, + newResource.workflowStepId(), + operation.equals(OpType.DELETE) ? "delete" : "add", + newResource.resourceType(), + newResource.resourceId() + ).getFormattedMessage(); logger.error(errorMessage, e); listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); } diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java index 1b58e66db..2b8db025c 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE; @@ -214,19 +215,32 @@ private void executeDeprovisionSequence( // Repeat attempting to delete resources as long as at least one is successful int resourceCount = deprovisionProcessSequence.size(); while (resourceCount > 0) { + PlainActionFuture stateUpdateFuture; Iterator iter = deprovisionProcessSequence.iterator(); - while (iter.hasNext()) { + do { ProcessNode deprovisionNode = iter.next(); ResourceCreated resource = getResourceFromDeprovisionNode(deprovisionNode, resourcesCreated); String resourceNameAndId = getResourceNameAndId(resource); PlainActionFuture deprovisionFuture = deprovisionNode.execute(); + stateUpdateFuture = PlainActionFuture.newFuture(); try { deprovisionFuture.get(); logger.info("Successful {} for {}", deprovisionNode.id(), resourceNameAndId); + // Remove from state index resource list + flowFrameworkIndicesHandler.deleteResourceFromStateIndex(workflowId, resource, stateUpdateFuture); + try { + // Wait at most 1 second for state index update. + stateUpdateFuture.actionGet(1, TimeUnit.SECONDS); + } catch (Exception e) { + // Ignore incremental resource removal failures (or timeouts) as we catch up at the end with remainingResources + } // Remove from list so we don't try again iter.remove(); // Pause briefly before next step Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; } catch (Throwable t) { // If any deprovision fails due to not found, it's a success if (t.getCause() instanceof OpenSearchStatusException @@ -238,7 +252,7 @@ private void executeDeprovisionSequence( logger.info("Failed {} for {}", deprovisionNode.id(), resourceNameAndId); } } - } + } while (iter.hasNext()); if (deprovisionProcessSequence.size() < resourceCount) { // If we've deleted something, decrement and try again if not zero resourceCount = deprovisionProcessSequence.size(); @@ -259,6 +273,7 @@ private void executeDeprovisionSequence( try { Thread.sleep(1000); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); break; } } else { @@ -274,6 +289,7 @@ private void executeDeprovisionSequence( if (!deleteNotAllowed.isEmpty()) { logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed); } + // This is a redundant best-effort backup to the incremental deletion done earlier updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener); } diff --git a/src/main/java/org/opensearch/flowframework/util/ApiSpecFetcher.java b/src/main/java/org/opensearch/flowframework/util/ApiSpecFetcher.java new file mode 100644 index 000000000..80be71b65 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/util/ApiSpecFetcher.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.util; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flowframework.exception.ApiSpecParseException; +import org.opensearch.rest.RestRequest; + +import java.util.HashSet; +import java.util.List; + +import io.swagger.v3.oas.models.OpenAPI; +import io.swagger.v3.oas.models.Operation; +import io.swagger.v3.oas.models.PathItem; +import io.swagger.v3.oas.models.media.Content; +import io.swagger.v3.oas.models.media.MediaType; +import io.swagger.v3.oas.models.media.Schema; +import io.swagger.v3.oas.models.parameters.RequestBody; +import io.swagger.v3.parser.OpenAPIV3Parser; +import io.swagger.v3.parser.core.models.ParseOptions; +import io.swagger.v3.parser.core.models.SwaggerParseResult; + +/** + * Utility class for fetching and parsing OpenAPI specifications. + */ +public class ApiSpecFetcher { + private static final Logger logger = LogManager.getLogger(ApiSpecFetcher.class); + private static final ParseOptions PARSE_OPTIONS = new ParseOptions(); + private static final OpenAPIV3Parser OPENAPI_PARSER = new OpenAPIV3Parser(); + + static { + PARSE_OPTIONS.setResolve(true); + PARSE_OPTIONS.setResolveFully(true); + } + + /** + * Parses the OpenAPI specification directly from the URI. + * + * @param apiSpecUri URI to the API specification (can be file path or web URI). + * @return Parsed OpenAPI object. + * @throws ApiSpecParseException If parsing fails. + */ + public static OpenAPI fetchApiSpec(String apiSpecUri) { + logger.info("Parsing API spec from URI: {}", apiSpecUri); + SwaggerParseResult result = OPENAPI_PARSER.readLocation(apiSpecUri, null, PARSE_OPTIONS); + OpenAPI openApi = result.getOpenAPI(); + + if (openApi == null) { + throw new ApiSpecParseException("Unable to parse spec from URI: " + apiSpecUri, result.getMessages()); + } + + return openApi; + } + + /** + * Compares the required fields in the API spec with the required enum parameters. + * + * @param requiredEnumParams List of required parameters from the enum. + * @param apiSpecUri URI of the API spec to fetch and compare. + * @param path The API path to check. + * @param method The HTTP method (POST, GET, etc.). + * @return boolean indicating if the required fields match. + */ + public static boolean compareRequiredFields(List requiredEnumParams, String apiSpecUri, String path, RestRequest.Method method) + throws IllegalArgumentException, ApiSpecParseException { + OpenAPI openAPI = fetchApiSpec(apiSpecUri); + + PathItem pathItem = openAPI.getPaths().get(path); + Content content = getContent(method, pathItem); + MediaType mediaType = content.get(XContentType.JSON.mediaTypeWithoutParameters()); + if (mediaType != null) { + Schema schema = mediaType.getSchema(); + + List requiredApiParams = schema.getRequired(); + if (requiredApiParams != null && !requiredApiParams.isEmpty()) { + return new HashSet<>(requiredEnumParams).equals(new HashSet<>(requiredApiParams)); + } + } + return false; + } + + private static Content getContent(RestRequest.Method method, PathItem pathItem) throws IllegalArgumentException, ApiSpecParseException { + Operation operation; + switch (method) { + case POST: + operation = pathItem.getPost(); + break; + case GET: + operation = pathItem.getGet(); + break; + case PUT: + operation = pathItem.getPut(); + break; + case DELETE: + operation = pathItem.getDelete(); + break; + default: + throw new IllegalArgumentException("Unsupported HTTP method: " + method); + } + + if (operation == null) { + throw new IllegalArgumentException("No operation found for the specified method: " + method); + } + + RequestBody requestBody = operation.getRequestBody(); + if (requestBody == null) { + throw new ApiSpecParseException("No requestBody defined for this operation."); + } + + return requestBody.getContent(); + } +} diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index 49fb46a7a..3570dccf6 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -817,6 +817,17 @@ protected List getResourcesCreated(RestClient client, String wo TimeUnit.SECONDS ); + return getResourcesCreated(client, workflowId); + } + + /** + * Helper method retrieve any resources created incrementally without waiting for completion + * @param client the rest client + * @param workflowId the workflow id to retrieve resources from + * @return a list of created resources + * @throws Exception if the request fails + */ + protected List getResourcesCreated(RestClient client, String workflowId) throws Exception { Response response = getWorkflowStatus(client, workflowId, true); // Parse workflow state from response and retrieve resources created diff --git a/src/test/java/org/opensearch/flowframework/exception/ApiSpecParseExceptionTests.java b/src/test/java/org/opensearch/flowframework/exception/ApiSpecParseExceptionTests.java new file mode 100644 index 000000000..ab93bd66c --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/exception/ApiSpecParseExceptionTests.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.exception; + +import org.opensearch.OpenSearchException; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Arrays; +import java.util.List; + +public class ApiSpecParseExceptionTests extends OpenSearchTestCase { + + public void testApiSpecParseException() { + ApiSpecParseException exception = new ApiSpecParseException("API spec parsing failed"); + assertTrue(exception instanceof OpenSearchException); + assertEquals("API spec parsing failed", exception.getMessage()); + } + + public void testApiSpecParseExceptionWithCause() { + Throwable cause = new RuntimeException("Underlying issue"); + ApiSpecParseException exception = new ApiSpecParseException("API spec parsing failed", cause); + assertTrue(exception instanceof OpenSearchException); + assertEquals("API spec parsing failed", exception.getMessage()); + assertEquals(cause, exception.getCause()); + } + + public void testApiSpecParseExceptionWithDetailedErrors() { + String message = "API spec parsing failed"; + List details = Arrays.asList("Missing required field", "Invalid type"); + ApiSpecParseException exception = new ApiSpecParseException(message, details); + assertTrue(exception instanceof OpenSearchException); + String expectedMessage = "API spec parsing failed: Missing required field, Invalid type"; + assertEquals(expectedMessage, exception.getMessage()); + } + +} diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index 3c6c4846b..a7dd7f75e 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -551,7 +551,7 @@ public void testDeleteFlowFrameworkSystemIndexDoc() throws IOException { ); } - public void testAddResourceToStateIndex() throws IOException { + public void testAddResourceToStateIndex() { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); when(clusterService.state()).thenReturn(mockClusterState); @@ -607,7 +607,7 @@ public void testAddResourceToStateIndex() throws IOException { ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); assertEquals( - "Failed to update workflow state for this_id on step node_id with connector_id this_id", + "Failed to update workflow state for this_id on step node_id to add resource connector_id this_id", exceptionCaptor.getValue().getMessage() ); @@ -652,7 +652,85 @@ public void testAddResourceToStateIndex() throws IOException { ); } - public void testAddResourceToStateIndexWithRetries() throws IOException { + public void testDeleteResourceFromStateIndex() { + ClusterState mockClusterState = mock(ClusterState.class); + Metadata mockMetaData = mock(Metadata.class); + when(clusterService.state()).thenReturn(mockClusterState); + when(mockClusterState.metadata()).thenReturn(mockMetaData); + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true); + ResourceCreated resourceToDelete = new ResourceCreated("", "node_id", "connector_id", "this_id"); + + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + // test success + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + XContentBuilder builder = XContentFactory.jsonBuilder(); + WorkflowState state = WorkflowState.builder().build(); + state.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED)); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, listener); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowData.class); + verify(listener, times(1)).onResponse(responseCaptor.capture()); + assertEquals("this_id", responseCaptor.getValue().getContent().get(WorkflowResources.CONNECTOR_ID)); + + // test failure + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(new Exception("Failed to update state")); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, listener); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals( + "Failed to update workflow state for this_id on step node_id to delete resource connector_id this_id", + exceptionCaptor.getValue().getMessage() + ); + + // test document not found + @SuppressWarnings("unchecked") + ActionListener notFoundListener = mock(ActionListener.class); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", -2, 0, 1, false, null, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, notFoundListener); + + exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(notFoundListener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals("Workflow state not found for this_id", exceptionCaptor.getValue().getMessage()); + + // test index not found + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(false); + @SuppressWarnings("unchecked") + ActionListener indexNotFoundListener = mock(ActionListener.class); + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, indexNotFoundListener); + + exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(indexNotFoundListener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals( + "Failed to update state for this_id due to missing .plugins-flow-framework-state index", + exceptionCaptor.getValue().getMessage() + ); + } + + public void testAddResourceToStateIndexWithRetries() { ClusterState mockClusterState = mock(ClusterState.class); Metadata mockMetaData = mock(Metadata.class); when(clusterService.state()).thenReturn(mockClusterState); @@ -745,7 +823,95 @@ public void testAddResourceToStateIndexWithRetries() throws IOException { ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(threeRetryListener, times(1)).onFailure(exceptionCaptor.capture()); assertEquals( - "Failed to update workflow state for this_id on step node_id with connector_id this_id", + "Failed to update workflow state for this_id on step node_id to add resource connector_id this_id", + exceptionCaptor.getValue().getMessage() + ); + } + + public void testDeleteResourceFromStateIndexWithRetries() { + ClusterState mockClusterState = mock(ClusterState.class); + Metadata mockMetaData = mock(Metadata.class); + when(clusterService.state()).thenReturn(mockClusterState); + when(mockClusterState.metadata()).thenReturn(mockMetaData); + when(mockMetaData.hasIndex(WORKFLOW_STATE_INDEX)).thenReturn(true); + VersionConflictEngineException conflictException = new VersionConflictEngineException( + new ShardId(WORKFLOW_STATE_INDEX, "", 1), + "this_id", + null + ); + UpdateResponse updateResponse = new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "this_id", -2, 0, 0, Result.UPDATED); + ResourceCreated resourceToDelete = new ResourceCreated("", "node_id", "connector_id", "this_id"); + + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + XContentBuilder builder = XContentFactory.jsonBuilder(); + WorkflowState state = WorkflowState.builder().build(); + state.toXContent(builder, null); + BytesReference workflowBytesRef = BytesReference.bytes(builder); + GetResult getResult = new GetResult(WORKFLOW_STATE_INDEX, "this_id", 1, 1, 1, true, workflowBytesRef, null, null); + responseListener.onResponse(new GetResponse(getResult)); + return null; + }).when(client).get(any(GetRequest.class), any()); + + // test success on retry + @SuppressWarnings("unchecked") + ActionListener retryListener = mock(ActionListener.class); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onResponse(updateResponse); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, retryListener); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowData.class); + verify(retryListener, times(1)).onResponse(responseCaptor.capture()); + assertEquals("this_id", responseCaptor.getValue().getContent().get(WorkflowResources.CONNECTOR_ID)); + + // test failure on 6th after 5 retries even if 7th would have been success + @SuppressWarnings("unchecked") + ActionListener threeRetryListener = mock(ActionListener.class); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(conflictException); + return null; + }).doAnswer(invocation -> { + // we'll never get here + ActionListener responseListener = invocation.getArgument(1); + responseListener.onResponse(updateResponse); + return null; + }).when(client).update(any(UpdateRequest.class), any()); + + flowFrameworkIndicesHandler.deleteResourceFromStateIndex("this_id", resourceToDelete, threeRetryListener); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(threeRetryListener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals( + "Failed to update workflow state for this_id on step node_id to delete resource connector_id this_id", exceptionCaptor.getValue().getMessage() ); } diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index bb7ba109d..f3e5ceea0 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -299,8 +299,13 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { assertNotNull(resourcesCreated.get(0).resourceId()); // Hit Deprovision API - // By design, this may not completely deprovision the first time if it takes >2s to process removals Response deprovisionResponse = deprovisionWorkflow(client(), workflowId); + // Test for incremental removal + assertBusy(() -> { + List resourcesRemaining = getResourcesCreated(client(), workflowId); + assertTrue(resourcesRemaining.size() < 5); + }, 30, TimeUnit.SECONDS); + // By design, this may not completely deprovision the first time if it takes >2s to process removals try { assertBusy( () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, diff --git a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java index 203255361..4841871aa 100644 --- a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java @@ -175,6 +175,7 @@ public void testDeprovisionWorkflow() throws Exception { ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); verify(listener, times(1)).onResponse(responseCaptor.capture()); assertEquals(workflowId, responseCaptor.getValue().getWorkflowId()); + verify(flowFrameworkIndicesHandler, times(1)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); } public void testFailToDeprovision() throws Exception { @@ -208,6 +209,7 @@ public void testFailToDeprovision() throws Exception { verify(listener, times(1)).onFailure(exceptionCaptor.capture()); assertEquals(RestStatus.ACCEPTED, exceptionCaptor.getValue().getRestStatus()); assertEquals("Failed to deprovision some resources: [model_id modelId].", exceptionCaptor.getValue().getMessage()); + verify(flowFrameworkIndicesHandler, times(0)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); } public void testAllowDeleteRequired() throws Exception { @@ -248,6 +250,7 @@ public void testAllowDeleteRequired() throws Exception { "These resources require the allow_delete parameter to deprovision: [index_name test-index].", exceptionCaptor.getValue().getMessage() ); + verify(flowFrameworkIndicesHandler, times(0)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); // Test (2nd) failure with wrong allow_delete param workflowRequest = new WorkflowRequest(workflowId, null, Map.of(ALLOW_DELETE, "wrong-index")); @@ -264,6 +267,7 @@ public void testAllowDeleteRequired() throws Exception { "These resources require the allow_delete parameter to deprovision: [index_name test-index].", exceptionCaptor.getValue().getMessage() ); + verify(flowFrameworkIndicesHandler, times(0)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); // Test success with correct allow_delete param workflowRequest = new WorkflowRequest(workflowId, null, Map.of(ALLOW_DELETE, "wrong-index,test-index,other-index")); @@ -280,6 +284,7 @@ public void testAllowDeleteRequired() throws Exception { ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); verify(listener, times(1)).onResponse(responseCaptor.capture()); assertEquals(workflowId, responseCaptor.getValue().getWorkflowId()); + verify(flowFrameworkIndicesHandler, times(1)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); } public void testFailToDeprovisionAndAllowDeleteRequired() throws Exception { @@ -323,5 +328,6 @@ public void testFailToDeprovisionAndAllowDeleteRequired() throws Exception { + " These resources require the allow_delete parameter to deprovision: [index_name test-index].", exceptionCaptor.getValue().getMessage() ); + verify(flowFrameworkIndicesHandler, times(0)).deleteResourceFromStateIndex(anyString(), any(ResourceCreated.class), any()); } } diff --git a/src/test/java/org/opensearch/flowframework/util/ApiSpecFetcherTests.java b/src/test/java/org/opensearch/flowframework/util/ApiSpecFetcherTests.java new file mode 100644 index 000000000..fb60ae08d --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/util/ApiSpecFetcherTests.java @@ -0,0 +1,130 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.util; + +import org.opensearch.flowframework.exception.ApiSpecParseException; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.Arrays; +import java.util.List; + +import io.swagger.v3.oas.models.OpenAPI; + +import static org.opensearch.flowframework.common.CommonValue.ML_COMMONS_API_SPEC_YAML_URI; +import static org.opensearch.rest.RestRequest.Method.DELETE; +import static org.opensearch.rest.RestRequest.Method.PATCH; +import static org.opensearch.rest.RestRequest.Method.POST; +import static org.opensearch.rest.RestRequest.Method.PUT; + +public class ApiSpecFetcherTests extends OpenSearchTestCase { + + private ApiSpecFetcher apiSpecFetcher; + + @Before + public void setUp() throws Exception { + super.setUp(); + } + + public void testFetchApiSpecSuccess() throws Exception { + + OpenAPI result = ApiSpecFetcher.fetchApiSpec(ML_COMMONS_API_SPEC_YAML_URI); + + assertNotNull("The fetched OpenAPI spec should not be null", result); + } + + public void testFetchApiSpecThrowsException() throws Exception { + String invalidUri = "http://invalid-url.com/fail.yaml"; + + ApiSpecParseException exception = expectThrows(ApiSpecParseException.class, () -> { ApiSpecFetcher.fetchApiSpec(invalidUri); }); + + assertNotNull("Exception should be thrown for invalid URI", exception); + assertTrue(exception.getMessage().contains("Unable to parse spec")); + } + + public void testCompareRequiredFieldsSuccess() throws Exception { + + String path = "/_plugins/_ml/agents/_register"; + RestRequest.Method method = POST; + + // Assuming REGISTER_AGENT step in the enum has these required fields + List expectedRequiredParams = Arrays.asList("name", "type"); + + boolean comparisonResult = ApiSpecFetcher.compareRequiredFields(expectedRequiredParams, ML_COMMONS_API_SPEC_YAML_URI, path, method); + + assertTrue("The required fields should match between API spec and enum", comparisonResult); + } + + public void testCompareRequiredFieldsFailure() throws Exception { + + String path = "/_plugins/_ml/agents/_register"; + RestRequest.Method method = POST; + + List wrongRequiredParams = Arrays.asList("nonexistent_param"); + + boolean comparisonResult = ApiSpecFetcher.compareRequiredFields(wrongRequiredParams, ML_COMMONS_API_SPEC_YAML_URI, path, method); + + assertFalse("The required fields should not match for incorrect input", comparisonResult); + } + + public void testCompareRequiredFieldsThrowsException() throws Exception { + String invalidUri = "http://invalid-url.com/fail.yaml"; + String path = "/_plugins/_ml/agents/_register"; + RestRequest.Method method = PUT; + + Exception exception = expectThrows( + Exception.class, + () -> { ApiSpecFetcher.compareRequiredFields(List.of(), invalidUri, path, method); } + ); + + assertNotNull("An exception should be thrown for an invalid API spec Uri", exception); + assertTrue(exception.getMessage().contains("Unable to parse spec")); + } + + public void testUnsupportedMethodException() throws IllegalArgumentException { + Exception exception = expectThrows(Exception.class, () -> { + ApiSpecFetcher.compareRequiredFields( + List.of("name", "type"), + ML_COMMONS_API_SPEC_YAML_URI, + "/_plugins/_ml/agents/_register", + PATCH + ); + }); + + assertEquals("Unsupported HTTP method: PATCH", exception.getMessage()); + } + + public void testNoOperationFoundException() throws Exception { + Exception exception = expectThrows(IllegalArgumentException.class, () -> { + ApiSpecFetcher.compareRequiredFields( + List.of("name", "type"), + ML_COMMONS_API_SPEC_YAML_URI, + "/_plugins/_ml/agents/_register", + DELETE + ); + }); + + assertEquals("No operation found for the specified method: DELETE", exception.getMessage()); + } + + public void testNoRequestBodyDefinedException() throws ApiSpecParseException { + Exception exception = expectThrows(ApiSpecParseException.class, () -> { + ApiSpecFetcher.compareRequiredFields( + List.of("name", "type"), + ML_COMMONS_API_SPEC_YAML_URI, + "/_plugins/_ml/model_groups/{model_group_id}", + RestRequest.Method.GET + ); + }); + + assertEquals("No requestBody defined for this operation.", exception.getMessage()); + } + +} diff --git a/src/test/java/org/opensearch/flowframework/workflow/RegisterAgentTests.java b/src/test/java/org/opensearch/flowframework/workflow/RegisterAgentTests.java index 626dfdfa1..c2b3dcca1 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/RegisterAgentTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/RegisterAgentTests.java @@ -13,6 +13,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.flowframework.util.ApiSpecFetcher; import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.ml.common.MLAgentType; import org.opensearch.ml.common.agent.LLMSpec; @@ -20,10 +21,12 @@ import org.opensearch.ml.common.agent.MLMemorySpec; import org.opensearch.ml.common.agent.MLToolSpec; import org.opensearch.ml.common.transport.agent.MLRegisterAgentResponse; +import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -31,6 +34,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import static org.opensearch.flowframework.common.CommonValue.ML_COMMONS_API_SPEC_YAML_URI; import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -150,4 +154,18 @@ public void testRegisterAgentFailure() throws IOException { assertTrue(ex.getCause() instanceof FlowFrameworkException); assertEquals("Failed to register the agent", ex.getCause().getMessage()); } + + public void testApiSpecRegisterAgentInputParamComparison() throws Exception { + List requiredEnumParams = WorkflowStepFactory.WorkflowSteps.REGISTER_AGENT.inputs(); + + boolean isMatch = ApiSpecFetcher.compareRequiredFields( + requiredEnumParams, + ML_COMMONS_API_SPEC_YAML_URI, + "/_plugins/_ml/agents/_register", + RestRequest.Method.POST + ); + + assertTrue(isMatch); + } + } diff --git a/src/test/java/org/opensearch/flowframework/workflow/ToolStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/ToolStepTests.java index 20d177dc9..2b5e5b7fa 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/ToolStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/ToolStepTests.java @@ -102,7 +102,7 @@ public void testTool() throws ExecutionException, InterruptedException { Collections.emptyMap() ); assertTrue(future.isDone()); - assertEquals(MLToolSpec.class, future.get().getContent().get("tools").getClass()); + assertEquals(MLToolSpec.class, future.get().getContent().get("tools").getClass()); assertEquals(Map.of("foo", "bar"), ((MLToolSpec) future.get().getContent().get("tools")).getConfigMap()); }