Skip to content

Commit

Permalink
Renaming State API implementation, changing names to GetWorkflowState
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Dec 11, 2023
1 parent 3f7649a commit 59c7f2a
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestGetTemplateAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowStateAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetTemplateAction;
import org.opensearch.flowframework.transport.GetTemplateTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowStateAction;
import org.opensearch.flowframework.transport.GetWorkflowStateTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
Expand Down Expand Up @@ -128,7 +128,7 @@ public List<RestHandler> getRestHandlers(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowStateAction(flowFrameworkFeatureEnabledSetting),
new RestGetTemplateAction(flowFrameworkFeatureEnabledSetting)
);
}
Expand All @@ -139,7 +139,7 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class),
new ActionHandler<>(GetTemplateAction.INSTANCE, GetTemplateTransportAction.class)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowRequest;
import org.opensearch.flowframework.transport.GetWorkflowStateAction;
import org.opensearch.flowframework.transport.GetWorkflowStateRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
Expand All @@ -36,23 +36,23 @@
/**
* Rest Action to facilitate requests to get a workflow status
*/
public class RestGetWorkflowAction extends BaseRestHandler {
public class RestGetWorkflowStateAction extends BaseRestHandler {

private static final String GET_WORKFLOW_ACTION = "get_workflow";
private static final Logger logger = LogManager.getLogger(RestGetWorkflowAction.class);
private static final String GET_WORKFLOW_STATE_ACTION = "get_workflow_state";
private static final Logger logger = LogManager.getLogger(RestGetWorkflowStateAction.class);
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new RestGetWorkflowAction
* Instantiates a new RestGetWorkflowStateAction
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestGetWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
public RestGetWorkflowStateAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
return GET_WORKFLOW_ACTION;
return GET_WORKFLOW_STATE_ACTION;
}

@Override
Expand All @@ -77,8 +77,8 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
}

boolean all = request.paramAsBoolean("all", false);
GetWorkflowRequest getWorkflowRequest = new GetWorkflowRequest(workflowId, all);
return channel -> client.execute(GetWorkflowAction.INSTANCE, getWorkflowRequest, ActionListener.wrap(response -> {
GetWorkflowStateRequest getWorkflowRequest = new GetWorkflowStateRequest(workflowId, all);
return channel -> client.execute(GetWorkflowStateAction.INSTANCE, getWorkflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
Expand All @@ -103,7 +103,6 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
@Override
public List<Route> routes() {
return ImmutableList.of(
// Provision workflow from indexed use case template
new Route(RestRequest.Method.GET, String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, WORKFLOW_ID, "_status"))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX;

/**
* External Action for public facing RestGetWorkflowAction
* External Action for public facing RestGetWorkflowStateAction
*/
public class GetWorkflowAction extends ActionType<GetWorkflowResponse> {
public class GetWorkflowStateAction extends ActionType<GetWorkflowStateResponse> {
/** The name of this action */
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow/get";
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflowstate/get";
/** An instance of this action */
public static final GetWorkflowAction INSTANCE = new GetWorkflowAction();
public static final GetWorkflowStateAction INSTANCE = new GetWorkflowStateAction();

private GetWorkflowAction() {
super(NAME, GetWorkflowResponse::new);
private GetWorkflowStateAction() {
super(NAME, GetWorkflowStateResponse::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import java.io.IOException;

/**
* Transport Request to get a workflow or workflow status
* Transport Request to get a workflow state
*/
public class GetWorkflowRequest extends ActionRequest {
public class GetWorkflowStateRequest extends ActionRequest {

/**
* The documentId of the workflow entry within the Global Context index
Expand All @@ -37,7 +37,7 @@ public class GetWorkflowRequest extends ActionRequest {
* @param workflowId the documentId of the workflow
* @param all whether the get request is looking for all fields in status
*/
public GetWorkflowRequest(@Nullable String workflowId, boolean all) {
public GetWorkflowStateRequest(@Nullable String workflowId, boolean all) {
this.workflowId = workflowId;
this.all = all;
}
Expand All @@ -47,7 +47,7 @@ public GetWorkflowRequest(@Nullable String workflowId, boolean all) {
* @param in The input stream to read from
* @throws IOException If the stream cannot be read properly
*/
public GetWorkflowRequest(StreamInput in) throws IOException {
public GetWorkflowStateRequest(StreamInput in) throws IOException {
super(in);
this.workflowId = in.readString();
this.all = in.readBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
/**
* Transport Response from getting a workflow status
*/
public class GetWorkflowResponse extends ActionResponse implements ToXContentObject {
public class GetWorkflowStateResponse extends ActionResponse implements ToXContentObject {

/** The workflow state */
public WorkflowState workflowState;
Expand All @@ -32,7 +32,7 @@ public class GetWorkflowResponse extends ActionResponse implements ToXContentObj
* @param in the input stream to read from
* @throws IOException if the workflowId cannot be read from the input stream
*/
public GetWorkflowResponse(StreamInput in) throws IOException {
public GetWorkflowStateResponse(StreamInput in) throws IOException {
super(in);
workflowState = new WorkflowState(in);
allStatus = in.readBoolean();
Expand All @@ -43,7 +43,7 @@ public GetWorkflowResponse(StreamInput in) throws IOException {
* @param workflowState the workflow state object
* @param allStatus whether to return all fields in state index
*/
public GetWorkflowResponse(WorkflowState workflowState, boolean allStatus) {
public GetWorkflowStateResponse(WorkflowState workflowState, boolean allStatus) {
if (allStatus) {
this.workflowState = workflowState;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,34 @@
* Transport Action to get a specific workflow. Currently, we only support the action with _status
* in the API path but will add the ability to get the workflow and not just the status in the future
*/
public class GetWorkflowTransportAction extends HandledTransportAction<GetWorkflowRequest, GetWorkflowResponse> {
public class GetWorkflowStateTransportAction extends HandledTransportAction<GetWorkflowStateRequest, GetWorkflowStateResponse> {

private final Logger logger = LogManager.getLogger(GetWorkflowTransportAction.class);
private final Logger logger = LogManager.getLogger(GetWorkflowStateTransportAction.class);

private final Client client;
private final NamedXContentRegistry xContentRegistry;

/**
* Intantiates a new CreateWorkflowTransportAction
* Intantiates a new GetWorkflowStateTransportAction
* @param transportService The TransportService
* @param actionFilters action filters
* @param client The client used to make the request to OS
* @param xContentRegistry contentRegister to parse get response
*/
@Inject
public GetWorkflowTransportAction(
public GetWorkflowStateTransportAction(
TransportService transportService,
ActionFilters actionFilters,
Client client,
NamedXContentRegistry xContentRegistry
) {
super(GetWorkflowAction.NAME, transportService, actionFilters, GetWorkflowRequest::new);
super(GetWorkflowStateAction.NAME, transportService, actionFilters, GetWorkflowStateRequest::new);
this.client = client;
this.xContentRegistry = xContentRegistry;
}

@Override
protected void doExecute(Task task, GetWorkflowRequest request, ActionListener<GetWorkflowResponse> listener) {
protected void doExecute(Task task, GetWorkflowStateRequest request, ActionListener<GetWorkflowStateResponse> listener) {
String workflowId = request.getWorkflowId();
User user = ParseUtils.getUserContext(client);
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX).id(workflowId);
Expand All @@ -75,7 +75,7 @@ protected void doExecute(Task task, GetWorkflowRequest request, ActionListener<G
try (XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, r.getSourceAsBytesRef())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);
listener.onResponse(new GetWorkflowResponse(workflowState, request.getAll()));
listener.onResponse(new GetWorkflowStateResponse(workflowState, request.getAll()));
} catch (Exception e) {
logger.error("Failed to parse workflowState" + r.getId(), e);
listener.onFailure(new FlowFrameworkException("Failed to parse workflowState" + r.getId(), RestStatus.BAD_REQUEST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RestGetWorkflowActionTests extends OpenSearchTestCase {
private RestGetWorkflowAction restGetWorkflowAction;
public class RestGetWorkflowStateActionTests extends OpenSearchTestCase {
private RestGetWorkflowStateAction restGetWorkflowAction;
private String getPath;
private NodeClient nodeClient;
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;
Expand All @@ -39,18 +39,18 @@ public void setUp() throws Exception {
this.getPath = String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, "workflow_id", "_status");
flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class);
when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true);
this.restGetWorkflowAction = new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting);
this.restGetWorkflowAction = new RestGetWorkflowStateAction(flowFrameworkFeatureEnabledSetting);
this.nodeClient = mock(NodeClient.class);
}

public void testConstructor() {
RestGetWorkflowAction getWorkflowAction = new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting);
RestGetWorkflowStateAction getWorkflowAction = new RestGetWorkflowStateAction(flowFrameworkFeatureEnabledSetting);
assertNotNull(getWorkflowAction);
}

public void testRestGetWorkflowActionName() {
String name = restGetWorkflowAction.getName();
assertEquals("get_workflow", name);
assertEquals("get_workflow_state", name);
}

public void testRestGetWorkflowActionRoutes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class GetWorkflowTransportActionTests extends OpenSearchTestCase {
public class GetWorkflowStateTransportActionTests extends OpenSearchTestCase {

private GetWorkflowTransportAction getWorkflowTransportAction;
private GetWorkflowStateTransportAction getWorkflowTransportAction;
private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private Client client;
private ThreadPool threadPool;
private ThreadContext threadContext;
private ActionListener<GetWorkflowResponse> response;
private ActionListener<GetWorkflowStateResponse> response;
private Task task;

@Override
public void setUp() throws Exception {
super.setUp();
this.client = mock(Client.class);
this.threadPool = mock(ThreadPool.class);
this.getWorkflowTransportAction = new GetWorkflowTransportAction(
this.getWorkflowTransportAction = new GetWorkflowStateTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
client,
Expand All @@ -65,9 +65,9 @@ public void setUp() throws Exception {
when(client.threadPool()).thenReturn(clientThreadPool);
when(clientThreadPool.getThreadContext()).thenReturn(threadContext);

response = new ActionListener<GetWorkflowResponse>() {
response = new ActionListener<GetWorkflowStateResponse>() {
@Override
public void onResponse(GetWorkflowResponse getResponse) {
public void onResponse(GetWorkflowStateResponse getResponse) {
assertTrue(true);
}

Expand All @@ -78,21 +78,21 @@ public void onFailure(Exception e) {}
}

public void testGetTransportAction() throws IOException {
GetWorkflowRequest getWorkflowRequest = new GetWorkflowRequest("1234", false);
GetWorkflowStateRequest getWorkflowRequest = new GetWorkflowStateRequest("1234", false);
getWorkflowTransportAction.doExecute(task, getWorkflowRequest, response);
}

public void testGetAction() {
Assert.assertNotNull(GetWorkflowAction.INSTANCE.name());
Assert.assertEquals(GetWorkflowAction.INSTANCE.name(), GetWorkflowAction.NAME);
Assert.assertNotNull(GetWorkflowStateAction.INSTANCE.name());
Assert.assertEquals(GetWorkflowStateAction.INSTANCE.name(), GetWorkflowStateAction.NAME);
}

public void testGetAnomalyDetectorRequest() throws IOException {
GetWorkflowRequest request = new GetWorkflowRequest("1234", false);
GetWorkflowStateRequest request = new GetWorkflowStateRequest("1234", false);
BytesStreamOutput out = new BytesStreamOutput();
request.writeTo(out);
StreamInput input = out.bytes().streamInput();
GetWorkflowRequest newRequest = new GetWorkflowRequest(input);
GetWorkflowStateRequest newRequest = new GetWorkflowStateRequest(input);
Assert.assertEquals(request.getWorkflowId(), newRequest.getWorkflowId());
Assert.assertEquals(request.getAll(), newRequest.getAll());
Assert.assertNull(newRequest.validate());
Expand All @@ -113,10 +113,10 @@ public void testGetAnomalyDetectorResponse() throws IOException {
Collections.emptyList()
);

GetWorkflowResponse response = new GetWorkflowResponse(workFlowState, false);
GetWorkflowStateResponse response = new GetWorkflowStateResponse(workFlowState, false);
response.writeTo(out);
NamedWriteableAwareStreamInput input = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), writableRegistry());
GetWorkflowResponse newResponse = new GetWorkflowResponse(input);
GetWorkflowStateResponse newResponse = new GetWorkflowStateResponse(input);
XContentBuilder builder = TestHelpers.builder();
Assert.assertNotNull(newResponse.toXContent(builder, ToXContent.EMPTY_PARAMS));

Expand Down

0 comments on commit 59c7f2a

Please sign in to comment.