Skip to content

Commit

Permalink
Prevent provisioning an already-provisioned workflow (opensearch-proj…
Browse files Browse the repository at this point in the history
…ect#466)

* Prevent provisioning an already-provisioned workflow

Signed-off-by: Daniel Widdis <[email protected]>

* Don't provision on failed state index update

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Jan 29, 2024
1 parent 693b71d commit 5d452f3
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ public void updateTemplateInGlobalContext(String documentId, Template template,
}
doesTemplateExist(documentId, templateExists -> {
if (templateExists) {
isWorkflowProvisioned(documentId, workflowIsProvisioned -> {
if (workflowIsProvisioned) {
isWorkflowNotStarted(documentId, workflowIsNotStarted -> {
if (workflowIsNotStarted) {
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId);
try (
XContentBuilder builder = XContentFactory.jsonBuilder();
Expand Down Expand Up @@ -474,7 +474,7 @@ public <T> void doesTemplateExist(String documentId, Consumer<Boolean> booleanRe
* @param listener action listener
* @param <T> action listener response type
*/
public <T> void isWorkflowProvisioned(String documentId, Consumer<Boolean> booleanResultConsumer, ActionListener<T> listener) {
public <T> void isWorkflowNotStarted(String documentId, Consumer<Boolean> booleanResultConsumer, ActionListener<T> listener) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, ActionListener.wrap(response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class CreateWorkflowTransportAction extends HandledTransportAction<Workfl
private final PluginsService pluginsService;

/**
* Intantiates a new CreateWorkflowTransportAction
* Instantiates a new CreateWorkflowTransportAction
* @param transportService the TransportService
* @param actionFilters action filters
* @param workflowProcessSorter the workflow process sorter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,32 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
List<ProcessNode> provisionProcessSequence = workflowProcessSorter.sortProcessNodes(provisionWorkflow, workflowId);
workflowProcessSorter.validate(provisionProcessSequence, pluginsService);

flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Map.entry(STATE_FIELD, State.PROVISIONING),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.IN_PROGRESS),
Map.entry(PROVISION_START_TIME_FIELD, Instant.now().toEpochMilli()),
Map.entry(RESOURCES_CREATED_FIELD, Collections.emptyList())
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to PROVISIONING", request.getWorkflowId());
listener.onResponse(new WorkflowResponse(workflowId));
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage()); })
);

executeWorkflowAsync(workflowId, provisionProcessSequence, listener);
flowFrameworkIndicesHandler.isWorkflowNotStarted(workflowId, workflowIsNotStarted -> {
if (workflowIsNotStarted) {
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Map.entry(STATE_FIELD, State.PROVISIONING),
Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.IN_PROGRESS),
Map.entry(PROVISION_START_TIME_FIELD, Instant.now().toEpochMilli()),
Map.entry(RESOURCES_CREATED_FIELD, Collections.emptyList())
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.PROVISIONING);
executeWorkflowAsync(workflowId, provisionProcessSequence, listener);
listener.onResponse(new WorkflowResponse(workflowId));
}, exception -> {
String errorMessage = "Failed to update wowrfow state: " + workflowId;
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})
);
} else {
String errorMessage = "The template has already been provisioned: " + workflowId;
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
}, listener);

}, exception -> {
if (exception instanceof FlowFrameworkException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public void testIsWorkflowProvisionedFailedParsing() {
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());
flowFrameworkIndicesHandler.isWorkflowProvisioned(documentId, function, listener);
flowFrameworkIndicesHandler.isWorkflowNotStarted(documentId, function, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertTrue(exceptionCaptor.getValue().getMessage().contains("Failed to parse workflow state"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import org.mockito.ArgumentCaptor;

Expand Down Expand Up @@ -129,6 +130,13 @@ public void testProvisionWorkflow() {

when(encryptorUtils.decryptTemplateCredentials(any())).thenReturn(template);

// Bypass isWorkflowNotStarted and force true response
doAnswer(invocation -> {
Consumer<Boolean> boolConsumer = invocation.getArgument(1);
boolConsumer.accept(true);
return null;
}).when(flowFrameworkIndicesHandler).isWorkflowNotStarted(any(), any(), any());

// Bypass updateFlowFrameworkSystemIndexDoc and stub on response
doAnswer(invocation -> {
ActionListener<UpdateResponse> actionListener = invocation.getArgument(2);
Expand All @@ -142,6 +150,47 @@ public void testProvisionWorkflow() {
assertEquals(workflowId, responseCaptor.getValue().getWorkflowId());
}

public void testProvisionWorkflowTwice() {

String workflowId = "2";
@SuppressWarnings("unchecked")
ActionListener<WorkflowResponse> listener = mock(ActionListener.class);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);

// Bypass client.get and stub success case
doAnswer(invocation -> {
ActionListener<GetResponse> responseListener = invocation.getArgument(1);

XContentBuilder builder = XContentFactory.jsonBuilder();
this.template.toXContent(builder, null);
BytesReference templateBytesRef = BytesReference.bytes(builder);
GetResult getResult = new GetResult(GLOBAL_CONTEXT_INDEX, workflowId, 1, 1, 1, true, templateBytesRef, null, null);
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());

when(encryptorUtils.decryptTemplateCredentials(any())).thenReturn(template);

// Bypass isWorkflowNotStarted and force false response
doAnswer(invocation -> {
Consumer<Boolean> boolConsumer = invocation.getArgument(1);
boolConsumer.accept(false);
return null;
}).when(flowFrameworkIndicesHandler).isWorkflowNotStarted(any(), any(), any());

// Bypass updateFlowFrameworkSystemIndexDoc and stub on response
doAnswer(invocation -> {
ActionListener<UpdateResponse> actionListener = invocation.getArgument(2);
actionListener.onResponse(mock(UpdateResponse.class));
return null;
}).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), any(), any());

provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("The template has already been provisioned: 2", exceptionCaptor.getValue().getMessage());
}

public void testFailedToRetrieveTemplateFromGlobalContext() {
@SuppressWarnings("unchecked")
ActionListener<WorkflowResponse> listener = mock(ActionListener.class);
Expand Down

0 comments on commit 5d452f3

Please sign in to comment.