diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index f6d515f43..eb67f2dc4 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -26,10 +26,12 @@ public class CommonValue { /** Global Context index mapping version */ public static final Integer GLOBAL_CONTEXT_INDEX_VERSION = 1; + /** The transport action name prefix */ + public static final String TRANSPORT_ACION_NAME_PREFIX = "cluster:admin/opensearch/flow_framework/"; /** The base URI for this plugin's rest actions */ - public static final String AI_FLOW_FRAMEWORK_BASE_URI = "/_plugins/_flow_framework"; + public static final String FLOW_FRAMEWORK_BASE_URI = "/_plugins/_flow_framework"; /** The URI for this plugin's workflow rest actions */ - public static final String WORKFLOW_URI = AI_FLOW_FRAMEWORK_BASE_URI + "/workflow"; + public static final String WORKFLOW_URI = FLOW_FRAMEWORK_BASE_URI + "/workflow"; /** Field name for workflow Id, the document Id of the indexed use case template */ public static final String WORKFLOW_ID = "workflow_id"; /** The field name for provision workflow within a use case template*/ diff --git a/src/main/java/org/opensearch/flowframework/indices/GlobalContextHandler.java b/src/main/java/org/opensearch/flowframework/indices/GlobalContextHandler.java index 49f0888e6..53037d7ce 100644 --- a/src/main/java/org/opensearch/flowframework/indices/GlobalContextHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/GlobalContextHandler.java @@ -100,7 +100,7 @@ public void putTemplateToGlobalContext(Template template, ActionListener listener) { + public void updateTemplateInGlobalContext(String documentId, Template template, ActionListener listener) { if (!createIndexStep.doesIndexExist(GLOBAL_CONTEXT_INDEX)) { String exceptionMessage = "Failed to update template for workflow_id : " + documentId diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowAction.java index 8e38c4189..0f49c826f 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowAction.java @@ -10,14 +10,15 @@ import org.opensearch.action.ActionType; +import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACION_NAME_PREFIX; + /** * External Action for public facing RestCreateWorkflowActiom */ public class CreateWorkflowAction extends ActionType { - // TODO : Determine external action prefix for plugin /** The name of this action */ - public static final String NAME = "workflows/create"; + public static final String NAME = TRANSPORT_ACION_NAME_PREFIX + "workflow/create"; /** An instance of this action */ public static final CreateWorkflowAction INSTANCE = new CreateWorkflowAction(); diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index abc704064..f4147b144 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -59,13 +59,17 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { - // TODO : Create StateIndexRequest for workflowId to reset entry to NOT_STARTED - listener.onResponse(new WorkflowResponse(response.getId())); - }, exception -> { - logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage()); - listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); - })); + globalContextHandler.updateTemplateInGlobalContext( + request.getWorkflowId(), + request.getTemplate(), + ActionListener.wrap(response -> { + // TODO : Create StateIndexRequest for workflowId to reset entry to NOT_STARTED + listener.onResponse(new WorkflowResponse(response.getId())); + }, exception -> { + logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), exception.getMessage()); + listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); + }) + ); } } diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowAction.java index 7167c4357..022e73488 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowAction.java @@ -10,14 +10,14 @@ import org.opensearch.action.ActionType; +import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACION_NAME_PREFIX; + /** * External Action for public facing RestProvisionWorkflowAction */ public class ProvisionWorkflowAction extends ActionType { - - // TODO : Determine external action prefix for plugin /** The name of this action */ - public static final String NAME = "workflows/provision"; + public static final String NAME = TRANSPORT_ACION_NAME_PREFIX + "workflow/provision"; /** An instance of this action */ public static final ProvisionWorkflowAction INSTANCE = new ProvisionWorkflowAction(); diff --git a/src/test/java/org/opensearch/flowframework/indices/GlobalContextHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/GlobalContextHandlerTests.java index a09f27b99..800f1d49e 100644 --- a/src/test/java/org/opensearch/flowframework/indices/GlobalContextHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/GlobalContextHandlerTests.java @@ -84,7 +84,7 @@ public void testPutTemplateToGlobalContext() throws IOException { ActionListener callback = invocation.getArgument(1); callback.onResponse(true); return null; - }).when(createIndexStep).initIndexIfAbsent(any(), any()); + }).when(createIndexStep).initIndexIfAbsent(any(FlowFrameworkIndex.class), any()); globalContextHandler.putTemplateToGlobalContext(template, listener); @@ -110,7 +110,7 @@ public void testStoreResponseToGlobalContext() { assertEquals(documentId, requestCaptor.getValue().id()); } - public void testUpdateTemplate() throws IOException { + public void testUpdateTemplateInGlobalContext() throws IOException { Template template = mock(Template.class); ActionListener listener = mock(ActionListener.class); when(template.toDocumentSource(any(XContentBuilder.class), eq(ToXContent.EMPTY_PARAMS))).thenAnswer(invocation -> { @@ -119,7 +119,7 @@ public void testUpdateTemplate() throws IOException { }); when(createIndexStep.doesIndexExist(any())).thenReturn(true); - globalContextHandler.updateTemplate("1", template, null); + globalContextHandler.updateTemplateInGlobalContext("1", template, null); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(IndexRequest.class); verify(client, times(1)).index(requestCaptor.capture(), any()); @@ -127,12 +127,12 @@ public void testUpdateTemplate() throws IOException { assertEquals("1", requestCaptor.getValue().id()); } - public void testFailedUpdateTemplate() throws IOException { + public void testFailedUpdateTemplateInGlobalContext() throws IOException { Template template = mock(Template.class); ActionListener listener = mock(ActionListener.class); when(createIndexStep.doesIndexExist(any())).thenReturn(false); - globalContextHandler.updateTemplate("1", template, listener); + globalContextHandler.updateTemplateInGlobalContext("1", template, listener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); diff --git a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java index 1df6ef23e..1b937570b 100644 --- a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java @@ -83,7 +83,7 @@ public void testCreateNewWorkflow() { ActionListener responseListener = invocation.getArgument(1); responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true)); return null; - }).when(globalContextHandler).putTemplateToGlobalContext(any(), any()); + }).when(globalContextHandler).putTemplateToGlobalContext(any(Template.class), any()); createWorkflowTransportAction.doExecute(mock(Task.class), createNewWorkflow, listener); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); @@ -101,7 +101,7 @@ public void testFailedToCreateNewWorkflow() { ActionListener responseListener = invocation.getArgument(1); responseListener.onFailure(new Exception("Failed to create global_context index")); return null; - }).when(globalContextHandler).putTemplateToGlobalContext(any(), any()); + }).when(globalContextHandler).putTemplateToGlobalContext(any(Template.class), any()); createWorkflowTransportAction.doExecute(mock(Task.class), createNewWorkflow, listener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); @@ -118,7 +118,7 @@ public void testUpdateWorkflow() { ActionListener responseListener = invocation.getArgument(2); responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true)); return null; - }).when(globalContextHandler).updateTemplate(any(), any(), any()); + }).when(globalContextHandler).updateTemplateInGlobalContext(any(), any(Template.class), any()); createWorkflowTransportAction.doExecute(mock(Task.class), updateWorkflow, listener); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); @@ -135,7 +135,7 @@ public void testFailedToUpdateWorkflow() { ActionListener responseListener = invocation.getArgument(2); responseListener.onFailure(new Exception("Failed to update use case template")); return null; - }).when(globalContextHandler).updateTemplate(any(), any(), any()); + }).when(globalContextHandler).updateTemplateInGlobalContext(any(), any(Template.class), any()); createWorkflowTransportAction.doExecute(mock(Task.class), updateWorkflow, listener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); diff --git a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java index c58e810b9..7e1e13e03 100644 --- a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java @@ -9,6 +9,7 @@ package org.opensearch.flowframework.transport; import org.opensearch.Version; +import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.client.Client; @@ -111,7 +112,7 @@ public void testProvisionWorkflow() { GetResult getResult = new GetResult(GLOBAL_CONTEXT_INDEX, workflowId, 1, 1, 1, true, templateBytesRef, null, null); responseListener.onResponse(new GetResponse(getResult)); return null; - }).when(client).get(any(), any()); + }).when(client).get(any(GetRequest.class), any()); provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); @@ -126,7 +127,7 @@ public void testFailedToRetrieveTemplateFromGlobalContext() { ActionListener responseListener = invocation.getArgument(1); responseListener.onFailure(new Exception("Failed to retrieve template from global context.")); return null; - }).when(client).get(any(), any()); + }).when(client).get(any(GetRequest.class), any()); provisionWorkflowTransportAction.doExecute(mock(Task.class), request, listener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java index 171ab8ae8..a952f6fe7 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java @@ -128,7 +128,7 @@ public void testInitIndexIfAbsent_IndexNotPresent() { ActionListener listener = mock(ActionListener.class); createIndexStep.initIndexIfAbsent(FlowFrameworkIndex.GLOBAL_CONTEXT, listener); - verify(indicesAdminClient, times(1)).create(any(), any()); + verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), any()); } public void testInitIndexIfAbsent_IndexExist() {