Skip to content

Commit

Permalink
Addressing PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Oct 12, 2023
1 parent 09dd471 commit 4d96e50
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ public class CommonValue {
/** Global Context index mapping version */
public static final Integer GLOBAL_CONTEXT_INDEX_VERSION = 1;

/** The transport action name prefix */
public static final String TRANSPORT_ACION_NAME_PREFIX = "cluster:admin/opensearch/flow_framework/";
/** The base URI for this plugin's rest actions */
public static final String AI_FLOW_FRAMEWORK_BASE_URI = "/_plugins/_flow_framework";
public static final String FLOW_FRAMEWORK_BASE_URI = "/_plugins/_flow_framework";
/** The URI for this plugin's workflow rest actions */
public static final String WORKFLOW_URI = AI_FLOW_FRAMEWORK_BASE_URI + "/workflow";
public static final String WORKFLOW_URI = FLOW_FRAMEWORK_BASE_URI + "/workflow";
/** Field name for workflow Id, the document Id of the indexed use case template */
public static final String WORKFLOW_ID = "workflow_id";
/** The field name for provision workflow within a use case template*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe
* @param template the use-case template
* @param listener action listener
*/
public void updateTemplate(String documentId, Template template, ActionListener<IndexResponse> listener) {
public void updateTemplateInGlobalContext(String documentId, Template template, ActionListener<IndexResponse> listener) {
if (!createIndexStep.doesIndexExist(GLOBAL_CONTEXT_INDEX)) {
String exceptionMessage = "Failed to update template for workflow_id : "
+ documentId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@

import org.opensearch.action.ActionType;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACION_NAME_PREFIX;

/**
* External Action for public facing RestCreateWorkflowActiom
*/
public class CreateWorkflowAction extends ActionType<WorkflowResponse> {

// TODO : Determine external action prefix for plugin
/** The name of this action */
public static final String NAME = "workflows/create";
public static final String NAME = TRANSPORT_ACION_NAME_PREFIX + "workflow/create";
/** An instance of this action */
public static final CreateWorkflowAction INSTANCE = new CreateWorkflowAction();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,17 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
}));
} else {
// Update existing entry, full document replacement
globalContextHandler.updateTemplate(request.getWorkflowId(), request.getTemplate(), ActionListener.wrap(response -> {
// TODO : Create StateIndexRequest for workflowId to reset entry to NOT_STARTED
listener.onResponse(new WorkflowResponse(response.getId()));
}, exception -> {
logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage());
listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR));
}));
globalContextHandler.updateTemplateInGlobalContext(
request.getWorkflowId(),
request.getTemplate(),
ActionListener.wrap(response -> {
// TODO : Create StateIndexRequest for workflowId to reset entry to NOT_STARTED
listener.onResponse(new WorkflowResponse(response.getId()));
}, exception -> {
logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage());
listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR));
})
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

import org.opensearch.action.ActionType;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACION_NAME_PREFIX;

/**
* External Action for public facing RestProvisionWorkflowAction
*/
public class ProvisionWorkflowAction extends ActionType<WorkflowResponse> {

// TODO : Determine external action prefix for plugin
/** The name of this action */
public static final String NAME = "workflows/provision";
public static final String NAME = TRANSPORT_ACION_NAME_PREFIX + "workflow/provision";
/** An instance of this action */
public static final ProvisionWorkflowAction INSTANCE = new ProvisionWorkflowAction();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testPutTemplateToGlobalContext() throws IOException {
ActionListener<Boolean> callback = invocation.getArgument(1);
callback.onResponse(true);
return null;
}).when(createIndexStep).initIndexIfAbsent(any(), any());
}).when(createIndexStep).initIndexIfAbsent(any(FlowFrameworkIndex.class), any());

globalContextHandler.putTemplateToGlobalContext(template, listener);

Expand All @@ -110,7 +110,7 @@ public void testStoreResponseToGlobalContext() {
assertEquals(documentId, requestCaptor.getValue().id());
}

public void testUpdateTemplate() throws IOException {
public void testUpdateTemplateInGlobalContext() throws IOException {
Template template = mock(Template.class);
ActionListener<IndexResponse> listener = mock(ActionListener.class);
when(template.toDocumentSource(any(XContentBuilder.class), eq(ToXContent.EMPTY_PARAMS))).thenAnswer(invocation -> {
Expand All @@ -119,20 +119,20 @@ public void testUpdateTemplate() throws IOException {
});
when(createIndexStep.doesIndexExist(any())).thenReturn(true);

globalContextHandler.updateTemplate("1", template, null);
globalContextHandler.updateTemplateInGlobalContext("1", template, null);

ArgumentCaptor<IndexRequest> requestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
verify(client, times(1)).index(requestCaptor.capture(), any());

assertEquals("1", requestCaptor.getValue().id());
}

public void testFailedUpdateTemplate() throws IOException {
public void testFailedUpdateTemplateInGlobalContext() throws IOException {
Template template = mock(Template.class);
ActionListener<IndexResponse> listener = mock(ActionListener.class);
when(createIndexStep.doesIndexExist(any())).thenReturn(false);

globalContextHandler.updateTemplate("1", template, listener);
globalContextHandler.updateTemplateInGlobalContext("1", template, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);

verify(listener, times(1)).onFailure(exceptionCaptor.capture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testCreateNewWorkflow() {
ActionListener<IndexResponse> responseListener = invocation.getArgument(1);
responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true));
return null;
}).when(globalContextHandler).putTemplateToGlobalContext(any(), any());
}).when(globalContextHandler).putTemplateToGlobalContext(any(Template.class), any());

createWorkflowTransportAction.doExecute(mock(Task.class), createNewWorkflow, listener);
ArgumentCaptor<WorkflowResponse> responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class);
Expand All @@ -101,7 +101,7 @@ public void testFailedToCreateNewWorkflow() {
ActionListener<IndexResponse> responseListener = invocation.getArgument(1);
responseListener.onFailure(new Exception("Failed to create global_context index"));
return null;
}).when(globalContextHandler).putTemplateToGlobalContext(any(), any());
}).when(globalContextHandler).putTemplateToGlobalContext(any(Template.class), any());

createWorkflowTransportAction.doExecute(mock(Task.class), createNewWorkflow, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
Expand All @@ -118,7 +118,7 @@ public void testUpdateWorkflow() {
ActionListener<IndexResponse> responseListener = invocation.getArgument(2);
responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true));
return null;
}).when(globalContextHandler).updateTemplate(any(), any(), any());
}).when(globalContextHandler).updateTemplateInGlobalContext(any(), any(Template.class), any());

createWorkflowTransportAction.doExecute(mock(Task.class), updateWorkflow, listener);
ArgumentCaptor<WorkflowResponse> responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class);
Expand All @@ -135,7 +135,7 @@ public void testFailedToUpdateWorkflow() {
ActionListener<IndexResponse> responseListener = invocation.getArgument(2);
responseListener.onFailure(new Exception("Failed to update use case template"));
return null;
}).when(globalContextHandler).updateTemplate(any(), any(), any());
}).when(globalContextHandler).updateTemplateInGlobalContext(any(), any(Template.class), any());

createWorkflowTransportAction.doExecute(mock(Task.class), updateWorkflow, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.flowframework.transport;

import org.opensearch.Version;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void testProvisionWorkflow() {
GetResult getResult = new GetResult(GLOBAL_CONTEXT_INDEX, workflowId, 1, 1, 1, true, templateBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(), any());
}).when(client).get(any(GetRequest.class), any());

provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener);
ArgumentCaptor<WorkflowResponse> responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class);
Expand All @@ -126,7 +127,7 @@ public void testFailedToRetrieveTemplateFromGlobalContext() {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);
responseListener.onFailure(new Exception("Failed to retrieve template from global context."));
return null;
}).when(client).get(any(), any());
}).when(client).get(any(GetRequest.class), any());

provisionWorkflowTransportAction.doExecute(mock(Task.class), request, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testInitIndexIfAbsent_IndexNotPresent() {
ActionListener<Boolean> listener = mock(ActionListener.class);
createIndexStep.initIndexIfAbsent(FlowFrameworkIndex.GLOBAL_CONTEXT, listener);

verify(indicesAdminClient, times(1)).create(any(), any());
verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), any());
}

public void testInitIndexIfAbsent_IndexExist() {
Expand Down

0 comments on commit 4d96e50

Please sign in to comment.