Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Improve error messages for workflow states other than NOT_STARTED #653

Merged
merged 1 commit into from
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Documentation
### Maintenance
### Refactoring
- Improve error messages for workflow states other than NOT_STARTED ([#642](https://github.com/opensearch-project/flow-framework/pull/642))
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -419,8 +420,8 @@
}
doesTemplateExist(documentId, templateExists -> {
if (templateExists) {
isWorkflowNotStarted(documentId, workflowIsNotStarted -> {
if (workflowIsNotStarted || ignoreNotStartedCheck) {
getProvisioningProgress(documentId, progress -> {

Check warning on line 423 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L423

Added line #L423 was not covered by tests
if (ignoreNotStartedCheck || ProvisioningProgress.NOT_STARTED.equals(progress.orElse(null))) {
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId);
try (
XContentBuilder builder = XContentFactory.jsonBuilder();
Expand All @@ -436,7 +437,9 @@
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
} else {
String errorMessage = "The template has already been provisioned so it can't be updated: " + documentId;
String errorMessage = "The template can not be updated unless its provisioning state is NOT_STARTED: "

Check warning on line 440 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L440

Added line #L440 was not covered by tests
+ documentId
+ ". Deprovision the workflow to reset the state.";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
Expand Down Expand Up @@ -474,36 +477,40 @@
}

/**
* Check if the workflow has been provisioned and executes the consumer by passing a boolean
* Check workflow provisioning state and executes the consumer
*
* @param documentId document id
* @param booleanResultConsumer boolean consumer function based on if workflow is provisioned or not
* @param provisioningProgressConsumer consumer function based on if workflow is provisioned.
* @param listener action listener
* @param <T> action listener response type
*/
public <T> void isWorkflowNotStarted(String documentId, Consumer<Boolean> booleanResultConsumer, ActionListener<T> listener) {
public <T> void getProvisioningProgress(
String documentId,
Consumer<Optional<ProvisioningProgress>> provisioningProgressConsumer,
ActionListener<T> listener
) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, ActionListener.wrap(response -> {
context.restore();
if (!response.isExists()) {
booleanResultConsumer.accept(false);
provisioningProgressConsumer.accept(Optional.empty());

Check warning on line 497 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L497

Added line #L497 was not covered by tests
return;
}
try (
XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);
booleanResultConsumer.accept(workflowState.getProvisioningProgress().equals(ProvisioningProgress.NOT_STARTED.name()));
provisioningProgressConsumer.accept(Optional.of(ProvisioningProgress.valueOf(workflowState.getProvisioningProgress())));

Check warning on line 505 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L505

Added line #L505 was not covered by tests
} catch (Exception e) {
String errorMessage = "Failed to parse workflow state " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
}
}, exception -> {
logger.error("Failed to get workflow state for {} ", documentId);
booleanResultConsumer.accept(false);
provisioningProgressConsumer.accept(Optional.empty());

Check warning on line 513 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L513

Added line #L513 was not covered by tests
}));
} catch (Exception e) {
String errorMessage = "Failed to retrieve workflow state to check provisioning status";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import static java.lang.Boolean.TRUE;
import static org.opensearch.flowframework.common.CommonValue.ERROR_FIELD;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
Expand Down Expand Up @@ -132,8 +131,8 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
);
workflowProcessSorter.validate(provisionProcessSequence, pluginsService);

flowFrameworkIndicesHandler.isWorkflowNotStarted(workflowId, workflowIsNotStarted -> {
if (TRUE.equals(workflowIsNotStarted)) {
flowFrameworkIndicesHandler.getProvisioningProgress(workflowId, progress -> {
if (ProvisioningProgress.NOT_STARTED.equals(progress.orElse(null))) {
// update state index
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Expand Down Expand Up @@ -174,7 +173,11 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
})
);
} else {
String errorMessage = "The template has already been provisioned: " + workflowId;
String errorMessage = "The workflow provisioning state is "
+ (progress.isPresent() ? progress.get().toString() : "unknown")
+ " and can not be provisioned unless its state is NOT_STARTED: "
+ workflowId
+ ". Deprovision the workflow to reset the state.";
logger.info(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowState;
Expand All @@ -46,6 +47,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -253,7 +255,7 @@ public void testInitIndexIfAbsent_IndexNotPresent() {

public void testIsWorkflowProvisionedFailedParsing() {
String documentId = randomAlphaOfLength(5);
Consumer<Boolean> function = mock(Consumer.class);
Consumer<Optional<ProvisioningProgress>> function = mock(Consumer.class);
ActionListener<GetResponse> listener = mock(ActionListener.class);
WorkflowState workFlowState = new WorkflowState(
documentId,
Expand All @@ -277,7 +279,7 @@ public void testIsWorkflowProvisionedFailedParsing() {
responseListener.onResponse(new GetResponse(getResult));
return null;
}).when(client).get(any(GetRequest.class), any());
flowFrameworkIndicesHandler.isWorkflowNotStarted(documentId, function, listener);
flowFrameworkIndicesHandler.getProvisioningProgress(documentId, function, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertTrue(exceptionCaptor.getValue().getMessage().contains("Failed to parse workflow state"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ public void testFailedUpdateWorkflow() throws Exception {
ResponseException.class,
() -> updateWorkflow(client(), workflowId, template)
);
assertTrue(exceptionProvisioned.getMessage().contains("The template has already been provisioned so it can't be updated"));
assertTrue(
exceptionProvisioned.getMessage().contains("The template can not be updated unless its provisioning state is NOT_STARTED")
);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowEdge;
Expand All @@ -40,6 +41,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -138,10 +140,10 @@ public void testProvisionWorkflow() {

// Bypass isWorkflowNotStarted and force true response
doAnswer(invocation -> {
Consumer<Boolean> boolConsumer = invocation.getArgument(1);
boolConsumer.accept(true);
Consumer<Optional<ProvisioningProgress>> progressConsumer = invocation.getArgument(1);
progressConsumer.accept(Optional.of(ProvisioningProgress.NOT_STARTED));
return null;
}).when(flowFrameworkIndicesHandler).isWorkflowNotStarted(any(), any(), any());
}).when(flowFrameworkIndicesHandler).getProvisioningProgress(any(), any(), any());

// Bypass updateFlowFrameworkSystemIndexDoc and stub on response
doAnswer(invocation -> {
Expand Down Expand Up @@ -185,10 +187,10 @@ public void testProvisionWorkflowTwice() {

// Bypass isWorkflowNotStarted and force false response
doAnswer(invocation -> {
Consumer<Boolean> boolConsumer = invocation.getArgument(1);
boolConsumer.accept(false);
Consumer<Optional<ProvisioningProgress>> progressConsumer = invocation.getArgument(1);
progressConsumer.accept(Optional.of(ProvisioningProgress.DONE));
return null;
}).when(flowFrameworkIndicesHandler).isWorkflowNotStarted(any(), any(), any());
}).when(flowFrameworkIndicesHandler).getProvisioningProgress(any(), any(), any());

// Bypass updateFlowFrameworkSystemIndexDoc and stub on response
doAnswer(invocation -> {
Expand All @@ -200,7 +202,10 @@ public void testProvisionWorkflowTwice() {
provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener);
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
verify(listener, times(1)).onFailure(exceptionCaptor.capture());
assertEquals("The template has already been provisioned: 2", exceptionCaptor.getValue().getMessage());
assertEquals(
"The workflow provisioning state is DONE and can not be provisioned unless its state is NOT_STARTED: 2. Deprovision the workflow to reset the state.",
exceptionCaptor.getValue().getMessage()
);
}

public void testFailedToRetrieveTemplateFromGlobalContext() {
Expand Down
Loading