Skip to content

Commit

Permalink
Backport #577 to 2.x (#591)
Browse files Browse the repository at this point in the history
Adding new exception type for workflow step failures (#577)

* new exception type for workflow step failures



* reverting change on rest classes



* adding integ tests for new exception written to state index



---------

Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz authored Mar 19, 2024
1 parent e2aa3b9 commit fcc41bb
Show file tree
Hide file tree
Showing 24 changed files with 154 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
- Added an optional workflow_step param to the get workflow steps API ([#538](https://github.com/opensearch-project/flow-framework/pull/538))
- Add created, updated, and provisioned timestamps to saved template ([#551](https://github.com/opensearch-project/flow-framework/pull/551))
- Enable Flow Framework by default ([#553](https://github.com/opensearch-project/flow-framework/pull/553))
- Adding new exception type for workflow step failures ([#577](https://github.com/opensearch-project/flow-framework/pull/577))

### Bug Fixes
### Infrastructure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class FlowFrameworkException extends RuntimeException implements ToXConte
private static final long serialVersionUID = 1L;

/** The rest status code of this exception */
private final RestStatus restStatus;
protected final RestStatus restStatus;

/**
* Constructor with error message.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.exception;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Representation of an exception that is caused by a workflow step failing outside of our plugin
* This is caught by an external client (e.g. ml-client) returning the failure
*/
public class WorkflowStepException extends FlowFrameworkException implements ToXContentObject {

private static final long serialVersionUID = 1L;

/**
* Constructor with error message.
*
* @param message message of the exception
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(String message, RestStatus restStatus) {
super(message, restStatus);
}

/**
* Constructor with specified cause.
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(Throwable cause, RestStatus restStatus) {
super(cause, restStatus);
}

/**
* Constructor with specified error message adn cause.
* @param message error message
* @param cause exception cause
* @param restStatus HTTP status code of the response
*/
public WorkflowStepException(String message, Throwable cause, RestStatus restStatus) {
super(message, cause, restStatus);
}

/**
* Getter for restStatus.
*
* @return the HTTP status code associated with the exception
*/
public RestStatus getRestStatus() {
return restStatus;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("error", this.getMessage()).endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to create workflow.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to delete workflow.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
}
// Validate content
if (request.hasContent()) {
// BaseRestHandler will give appropriate error message
return channel -> channel.sendResponse(null);
throw new FlowFrameworkException("deprovision request should have no payload", RestStatus.BAD_REQUEST);
}
// Validate params
if (workflowId == null) {
Expand All @@ -82,7 +81,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to deprovision workflow.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow state.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to get workflow step.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException("Failed to provision workflow.", ExceptionsHelper.status(exception));
: new FlowFrameworkException("Failed to get workflow.", ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,18 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
}, exception -> { logger.error("Failed to update workflow state for workflow {}", workflowId, exception); })
);
} catch (Exception ex) {
RestStatus status;
if (ex instanceof FlowFrameworkException) {
status = ((FlowFrameworkException) ex).getRestStatus();
} else {
status = ExceptionsHelper.status(ex);
}
logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex);
String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName())
+ " during step "
+ currentStepId;
+ currentStepId
+ ", restStatus: "
+ status.toString();
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;

Expand Down Expand Up @@ -140,7 +141,7 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed step " + pipelineToBeCreated;
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -214,7 +215,7 @@ public PlainActionFuture<WorkflowData> execute(
}, exception -> {
String errorMessage = "Failed to register local model in step " + currentNodeId;
logger.error(errorMessage, exception);
registerLocalModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(exception)));
}));
} catch (FlowFrameworkException e) {
registerLocalModelFuture.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTask;
Expand Down Expand Up @@ -127,7 +128,7 @@ protected void retryableGetMlTask(
}, exception -> {
String errorMessage = workflowStep + " failed";
logger.error(errorMessage, exception);
mlTaskListener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
mlTaskListener.onFailure(new WorkflowStepException(errorMessage, RestStatus.BAD_REQUEST));
}));
try {
Thread.sleep(this.retryDuration.getMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -123,7 +124,7 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to create connector";
logger.error(errorMessage, e);
createConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;

Expand Down Expand Up @@ -84,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to delete agent " + agentId;
logger.error(errorMessage, e);
deleteAgentFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
deleteAgentFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
});
} catch (FlowFrameworkException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;

Expand Down Expand Up @@ -84,7 +85,7 @@ public void onResponse(DeleteResponse deleteResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to delete connector " + connectorId;
logger.error(errorMessage, e);
deleteConnectorFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
deleteConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
});
} catch (FlowFrameworkException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;

Expand Down Expand Up @@ -85,7 +86,7 @@ public void onResponse(DeleteResponse deleteResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to delete model " + modelId;
logger.error(errorMessage, e);
deleteModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
deleteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
});
} catch (FlowFrameworkException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -117,7 +118,7 @@ public void onResponse(MLDeployModelResponse mlDeployModelResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to deploy model " + modelId;
logger.error(errorMessage, e);
deployModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
deployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
});
} catch (FlowFrameworkException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -135,7 +136,7 @@ public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) {
public void onFailure(Exception e) {
String errorMessage = "Failed to register the agent";
logger.error(errorMessage, e);
registerAgentModelFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
registerAgentModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -118,7 +119,7 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse
public void onFailure(Exception e) {
String errorMessage = "Failed to register model group";
logger.error(errorMessage, e);
registerModelGroupFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
registerModelGroupFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

Expand Down
Loading

0 comments on commit fcc41bb

Please sign in to comment.