Skip to content

Commit

Permalink
[Backport 2.x] Add integ tests for search state api, deprovision api,…
Browse files Browse the repository at this point in the history
… and delete api (#316)

* Add integ tests for search state api, deprovision api, and delete api (#313)

* add integ tests for search state api, deprovision api, and delete api

Signed-off-by: Jackie Han <[email protected]>

* spotless check clean up

Signed-off-by: Jackie Han <[email protected]>

* add timeout

Signed-off-by: Jackie Han <[email protected]>

* add more timeout

Signed-off-by: Jackie Han <[email protected]>

* Add thread sleep to wait for deletion to complete

Signed-off-by: Jackie Han <[email protected]>

---------

Signed-off-by: Jackie Han <[email protected]>
(cherry picked from commit 4efbc95)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Fix 2.x compatibility

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Jackie Han <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Signed-off-by: Daniel Widdis <[email protected]>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Daniel Widdis <[email protected]>
  • Loading branch information
3 people authored Dec 24, 2023
1 parent f98be8f commit a7d9d7c
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
*/
package org.opensearch.flowframework.model;

import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.common.ParsingException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParseException;
Expand Down Expand Up @@ -393,6 +396,22 @@ public static WorkflowState parse(XContentParser parser) throws IOException {
.build();
}

/**
* Parse a JSON workflow state
* @param json A string containing a JSON representation of a workflow state
* @return A {@link WorkflowState} represented by the JSON
* @throws IOException on failure to parse
*/
public static WorkflowState parse(String json) throws IOException {
XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
json
);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
return parse(parser);
}

/**
* The workflowID associated with this workflow-state
* @return the workflowId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ protected Response createWorkflow(Template template) throws Exception {
return TestHelpers.makeRequest(client(), "POST", WORKFLOW_URI, ImmutableMap.of(), template.toJson(), null);
}

protected Response createWorkflowWithProvision(Template template) throws Exception {
return TestHelpers.makeRequest(client(), "POST", WORKFLOW_URI + "?provision=true", ImmutableMap.of(), template.toJson(), null);
}

/**
* Helper method to invoke the Create Workflow Rest Action with dry run validation
* @param template the template to create
Expand Down Expand Up @@ -343,6 +347,40 @@ protected Response provisionWorkflow(String workflowId) throws Exception {
);
}

/**
* Helper method to invoke the Deprovision Workflow Rest Action
* @param workflowId the workflow ID to deprovision
* @return a rest response
* @throws Exception if the request fails
*/
protected Response deprovisionWorkflow(String workflowId) throws Exception {
return TestHelpers.makeRequest(
client(),
"POST",
String.format(Locale.ROOT, "%s/%s/%s", WORKFLOW_URI, workflowId, "_deprovision"),
ImmutableMap.of(),
"",
null
);
}

/**
* Helper method to invoke the Delete Workflow Rest Action
* @param workflowId the workflow ID to delete
* @return a rest response
* @throws Exception if the request fails
*/
protected Response deleteWorkflow(String workflowId) throws Exception {
return TestHelpers.makeRequest(
client(),
"DELETE",
String.format(Locale.ROOT, "%s/%s", WORKFLOW_URI, workflowId),
ImmutableMap.of(),
"",
null
);
}

/**
* Helper method to invoke the Get Workflow Rest Action
* @param workflowId the workflow ID to get the status
Expand Down Expand Up @@ -395,6 +433,31 @@ protected SearchResponse searchWorkflows(String query) throws Exception {
}
}

protected SearchResponse searchWorkflowState(String query) throws Exception {
Response restSearchResponse = TestHelpers.makeRequest(
client(),
"GET",
String.format(Locale.ROOT, "%s/state/_search", WORKFLOW_URI),
ImmutableMap.of(),
query,
null
);
assertEquals(RestStatus.OK, TestHelpers.restStatus(restSearchResponse));

// Parse entity content into SearchResponse
MediaType mediaType = MediaType.fromMediaType(restSearchResponse.getEntity().getContentType().getValue());
try (
XContentParser parser = mediaType.xContent()
.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
restSearchResponse.getEntity().getContent()
)
) {
return SearchResponse.fromXContent(parser);
}
}

/**
* Helper method to invoke the Get Workflow Rest Action and assert the provisioning and state status
* @param workflowId the workflow ID to get the status
Expand All @@ -408,8 +471,8 @@ protected void getAndAssertWorkflowStatus(String workflowId, State stateStatus,
assertEquals(RestStatus.OK, TestHelpers.restStatus(response));

Map<String, Object> responseMap = entityAsMap(response);
assertEquals(stateStatus.name(), (String) responseMap.get(CommonValue.STATE_FIELD));
assertEquals(provisioningStatus.name(), (String) responseMap.get(CommonValue.PROVISIONING_PROGRESS_FIELD));
assertEquals(stateStatus.name(), responseMap.get(CommonValue.STATE_FIELD));
assertEquals(provisioningStatus.name(), responseMap.get(CommonValue.PROVISIONING_PROGRESS_FIELD));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowEdge;
import org.opensearch.flowframework.model.WorkflowNode;
import org.opensearch.flowframework.model.WorkflowState;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
Expand Down Expand Up @@ -174,4 +179,58 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(0).resourceId());
}

public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
Template template = TestHelpers.createTemplateFromFile("agent-framework.json");

// Hit Create Workflow API to create agent-framework template, with template validation check and provision parameter
Response response = createWorkflowWithProvision(template);
assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response));
Map<String, Object> responseMap = entityAsMap(response);
String workflowId = (String) responseMap.get(WORKFLOW_ID);
// wait and ensure state is completed/done
assertBusy(() -> { getAndAssertWorkflowStatus(workflowId, State.COMPLETED, ProvisioningProgress.DONE); }, 30, TimeUnit.SECONDS);

// Hit Search State API with the workflow id created above
String query = "{\"query\":{\"ids\":{\"values\":[\"" + workflowId + "\"]}}}";
SearchResponse searchResponse = searchWorkflowState(query);
assertEquals(1, searchResponse.getHits().getTotalHits().value);
String searchHitSource = searchResponse.getHits().getAt(0).getSourceAsString();
WorkflowState searchHitWorkflowState = WorkflowState.parse(searchHitSource);

// Assert based on the agent-framework template
List<ResourceCreated> resourcesCreated = searchHitWorkflowState.resourcesCreated();
Set<String> expectedStepNames = new HashSet<>();
expectedStepNames.add("root_agent");
expectedStepNames.add("sub_agent");
expectedStepNames.add("openAI_connector");
expectedStepNames.add("gpt-3.5-model");
expectedStepNames.add("deployed-gpt-3.5-model");
Set<String> stepNames = resourcesCreated.stream().map(ResourceCreated::workflowStepId).collect(Collectors.toSet());

assertEquals(5, resourcesCreated.size());
assertEquals(stepNames, expectedStepNames);
assertNotNull(resourcesCreated.get(0).resourceId());

// Hit Deprovision API
Response deprovisionResponse = deprovisionWorkflow(workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(deprovisionResponse));
assertBusy(
() -> { getAndAssertWorkflowStatus(workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); },
30,
TimeUnit.SECONDS
);

// Hit Delete API
Response deleteResponse = deleteWorkflow(workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));

// wait for deletion to complete
Thread.sleep(30000);

// Search this workflow id in global_context index to make sure it's deleted
SearchResponse searchResponseAfterDeletion = searchWorkflows(query);
assertBusy(() -> assertEquals(0, searchResponseAfterDeletion.getHits().getTotalHits().value), 30, TimeUnit.SECONDS);

}

}
Loading

0 comments on commit a7d9d7c

Please sign in to comment.