diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 14c2ca25d..8bbc30658 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -81,6 +81,6 @@ jobs: run: | ./gradlew integTest yamlRestTest - name: Multi Nodes Integration Testing - if: matrix.java == 21 + if: matrix.java == '21.0.1' run: | ./gradlew integTest -PnumNodes=3 diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index 1d450cd26..b8b67cbd4 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -45,6 +45,7 @@ import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.model.WorkflowState; import org.opensearch.test.rest.OpenSearchRestTestCase; +import org.junit.After; import org.junit.Before; import java.io.IOException; @@ -75,56 +76,49 @@ public abstract class FlowFrameworkRestTestCase extends OpenSearchRestTestCase { @Before protected void setUpSettings() throws Exception { - if (!indexExistsWithAdminClient(".plugins-ml-config")) { - // Initial cluster set up - - // Enable Flow Framework Plugin Rest APIs - Response response = TestHelpers.makeRequest( - client(), - "PUT", - "_cluster/settings", - null, - "{\"transient\":{\"plugins.flow_framework.enabled\":true}}", - List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) - ); - assertEquals(200, response.getStatusLine().getStatusCode()); - - // Enable ML Commons to run on non-ml nodes - response = TestHelpers.makeRequest( - client(), - "PUT", - "_cluster/settings", - null, - "{\"persistent\":{\"plugins.ml_commons.only_run_on_ml_node\":false}}", - List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) - ); - assertEquals(200, response.getStatusLine().getStatusCode()); - - // Enable local model registration via URL - response = TestHelpers.makeRequest( - client(), - "PUT", - "_cluster/settings", - null, - "{\"persistent\":{\"plugins.ml_commons.allow_registering_model_via_url\":true}}", - List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) - ); - assertEquals(200, response.getStatusLine().getStatusCode()); - - // Set ML jvm heap memory threshold to 100 to avoid opening the circuit breaker during tests - response = TestHelpers.makeRequest( - client(), - "PUT", - "_cluster/settings", - null, - "{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":100}}", - List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) - ); - assertEquals(200, response.getStatusLine().getStatusCode()); + // Enable Flow Framework Plugin Rest APIs + Response response = TestHelpers.makeRequest( + client(), + "PUT", + "_cluster/settings", + null, + "{\"transient\":{\"plugins.flow_framework.enabled\":true}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(200, response.getStatusLine().getStatusCode()); - // Ensure .plugins-ml-config is created before proceeding with integration tests - assertBusy(() -> { assertTrue(indexExistsWithAdminClient(".plugins-ml-config")); }, 60, TimeUnit.SECONDS); - } + // Enable ML Commons to run on non-ml nodes + response = TestHelpers.makeRequest( + client(), + "PUT", + "_cluster/settings", + null, + "{\"persistent\":{\"plugins.ml_commons.only_run_on_ml_node\":false}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(200, response.getStatusLine().getStatusCode()); + + // Enable local model registration via URL + response = TestHelpers.makeRequest( + client(), + "PUT", + "_cluster/settings", + null, + "{\"persistent\":{\"plugins.ml_commons.allow_registering_model_via_url\":true}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(200, response.getStatusLine().getStatusCode()); + + // Set ML jvm heap memory threshold to 100 to avoid opening the circuit breaker during tests + response = TestHelpers.makeRequest( + client(), + "PUT", + "_cluster/settings", + null, + "{\"persistent\":{\"plugins.ml_commons.jvm_heap_memory_threshold\":100}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(200, response.getStatusLine().getStatusCode()); // Set up clients if running in security enabled cluster if (isHttps()) { @@ -132,7 +126,7 @@ protected void setUpSettings() throws Exception { String readAccessUserPassword = generatePassword(READ_ACCESS_USER); // Configure full access user and client, needs ML Full Access role as well - Response response = createUser( + response = createUser( FULL_ACCESS_USER, fullAccessUserPassword, List.of(FLOW_FRAMEWORK_FULL_ACCESS_ROLE, ML_COMMONS_FULL_ACCESS_ROLE) @@ -187,7 +181,7 @@ protected static void deleteIndexWithAdminClient(String name) throws IOException // Utility fn for checking if an index exists. Should only be used when not allowed in a regular context // (e.g., checking existence of system indices) - protected static boolean indexExistsWithAdminClient(String indexName) throws IOException { + public static boolean indexExistsWithAdminClient(String indexName) throws IOException { Request request = new Request("HEAD", "/" + indexName); Response response = adminClient().performRequest(request); return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode(); @@ -253,6 +247,37 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s } } + @SuppressWarnings("unchecked") + @After + protected void wipeAllODFEIndices() throws IOException { + Response response = adminClient().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all")); + MediaType xContentType = MediaType.fromMediaType(response.getEntity().getContentType()); + try ( + XContentParser parser = xContentType.xContent() + .createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + response.getEntity().getContent() + ) + ) { + XContentParser.Token token = parser.nextToken(); + List> parserList = null; + if (token == XContentParser.Token.START_ARRAY) { + parserList = parser.listOrderedMap().stream().map(obj -> (Map) obj).collect(Collectors.toList()); + } else { + parserList = Collections.singletonList(parser.mapOrdered()); + } + + for (Map index : parserList) { + String indexName = (String) index.get("index"); + // Do not reset ML encryption index as this is needed to encrypt connector credentials + if (indexName != null && !".opendistro_security".equals(indexName) && !".plugins-ml-config".equals(indexName)) { + adminClient().performRequest(new Request("DELETE", "/" + indexName)); + } + } + } + } + /** * wipeAllIndices won't work since it cannot delete security index. Use wipeAllSystemIndices instead. */ @@ -290,12 +315,13 @@ protected Response createWorkflow(RestClient client, Template template) throws E /** * Helper method to invoke the Create Workflow Rest Action with provision + * @param client the rest client * @param template the template to create * @throws Exception if the request fails * @return a rest response */ - protected Response createWorkflowWithProvision(Template template) throws Exception { - return TestHelpers.makeRequest(client(), "POST", WORKFLOW_URI + "?provision=true", Collections.emptyMap(), template.toJson(), null); + protected Response createWorkflowWithProvision(RestClient client, Template template) throws Exception { + return TestHelpers.makeRequest(client, "POST", WORKFLOW_URI + "?provision=true", Collections.emptyMap(), template.toJson(), null); } /** diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index c097fce6b..dc24fc4cb 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -74,7 +74,7 @@ public void testFailedUpdateWorkflow() throws Exception { assertEquals(RestStatus.CREATED, TestHelpers.restStatus(responseCreate)); Template template = TestHelpers.createTemplateFromFile("createconnector-registerremotemodel-deploymodel.json"); - Thread.sleep(1000); + ResponseException exception = expectThrows(ResponseException.class, () -> updateWorkflow(client(), "123", template)); assertTrue(exception.getMessage().contains("Failed to get template: 123")); @@ -215,8 +215,14 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception { String workflowId = (String) responseMap.get(WORKFLOW_ID); getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); - // Hit Provision API and assert status - response = provisionWorkflow(client(), workflowId); + // Ensure Ml config index is initialized as creating a connector requires this, then hit Provision API and assert status + if(!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 25, TimeUnit.SECONDS); + response = provisionWorkflow(client(), workflowId); + } else { + response = provisionWorkflow(client(), workflowId); + } + assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS); @@ -243,7 +249,13 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { Template template = TestHelpers.createTemplateFromFile("agent-framework.json"); // Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter - Response response = createWorkflowWithProvision(template); + Response response; + if(!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 25, TimeUnit.SECONDS); + response = createWorkflowWithProvision(client(), template); + } else { + response = createWorkflowWithProvision(client(), template); + } assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); Map responseMap = entityAsMap(response); String workflowId = (String) responseMap.get(WORKFLOW_ID); diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java index e83e7f08e..c9d6641c6 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkSecureRestApiIT.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; @@ -116,7 +117,12 @@ public void testCreateProvisionDeprovisionWorkflowWithFullAccess() throws Except assertEquals(RestStatus.OK, searchResponse.status()); // Invoke provision API - response = provisionWorkflow(fullAccessClient(), workflowId); + if(!indexExistsWithAdminClient(".plugins-ml-config")) { + assertBusy(() -> assertTrue(indexExistsWithAdminClient(".plugins-ml-config")), 25, TimeUnit.SECONDS); + response = provisionWorkflow(fullAccessClient(), workflowId); + } else { + response = provisionWorkflow(fullAccessClient(), workflowId); + } assertEquals(RestStatus.OK, TestHelpers.restStatus(response)); // Invoke status API