Skip to content

Commit

Permalink
[Feature/agent_framework] Added Retry functionality for Deploy Model (#…
Browse files Browse the repository at this point in the history
…245)

* Added retry functionality for DeployModel

Signed-off-by: Owais Kazi <[email protected]>

* Fixed timeout and exception issues

Signed-off-by: Owais Kazi <[email protected]>

* Addressed PR Comments

Signed-off-by: Owais Kazi <[email protected]>

---------

Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 authored and dbwiddis committed Dec 18, 2023
1 parent 8ee032b commit e0947ee
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public enum WorkflowResources {
REGISTER_LOCAL_MODEL("register_local_model", "model_id"),
/** official workflow step name for registering a model group and associated created resource */
REGISTER_MODEL_GROUP("register_model_group", "model_group_id"),
/** official workflow step name for deploying a model and associated created resource */
DEPLOY_MODEL("deploy_model", "model_id"),
/** official workflow step name for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id"),
/** official workflow step name for creating an index and associated created resource */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class WorkflowNode implements ToXContentObject {
/** The field defining the timeout value for this node */
public static final String NODE_TIMEOUT_FIELD = "node_timeout";
/** The default timeout value if the template doesn't override it */
public static final String NODE_TIMEOUT_DEFAULT_VALUE = "10s";
public static final String NODE_TIMEOUT_DEFAULT_VALUE = "15s";

private final String id; // unique id
private final String type; // maps to a WorkflowStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,133 @@
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.WorkflowResources;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTaskState;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;

/**
* Abstract retryable workflow step
*/
public abstract class AbstractRetryableWorkflowStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(AbstractRetryableWorkflowStep.class);
/** The maximum number of transport request retries */
protected volatile Integer maxRetry;
private final MachineLearningNodeClient mlClient;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/**
* Instantiates a new Retryable workflow step
* @param settings Environment settings
* @param clusterService the cluster service
* @param mlClient machine learning client
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public AbstractRetryableWorkflowStep(Settings settings, ClusterService clusterService) {
public AbstractRetryableWorkflowStep(
Settings settings,
ClusterService clusterService,
MachineLearningNodeClient mlClient,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler
) {
this.maxRetry = MAX_GET_TASK_REQUEST_RETRY.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_GET_TASK_REQUEST_RETRY, it -> maxRetry = it);
this.mlClient = mlClient;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

/**
* Retryable get ml task
* @param workflowId the workflow id
* @param nodeId the workflow node id
* @param future the workflow step future
* @param taskId the ml task id
* @param retries the current number of request retries
* @param workflowStep the workflow step which requires a retry get ml task functionality
*/
void retryableGetMlTask(
String workflowId,
String nodeId,
CompletableFuture<WorkflowData> future,
String taskId,
int retries,
String workflowStep
) {
mlClient.getTask(taskId, ActionListener.wrap(response -> {
MLTaskState currentState = response.getState();
if (currentState != MLTaskState.COMPLETED) {
if (Stream.of(MLTaskState.FAILED, MLTaskState.COMPLETED_WITH_ERROR).anyMatch(x -> x == currentState)) {
// Model registration failed or completed with errors
String errorMessage = workflowStep + " failed with error : " + response.getError();
logger.error(errorMessage);
future.completeExceptionally(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
// Task still in progress, attempt retry
throw new IllegalStateException(workflowStep + " is not yet completed");
}
} else {
try {
logger.info(workflowStep + " successful for {} and modelId {}", workflowId, response.getModelId());
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
nodeId,
getName(),
response.getTaskId(),
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
future.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, response.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, response.getState().name())
),
workflowId,
nodeId
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
future.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
future.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}
}
}, exception -> {
if (retries < maxRetry) {
// Sleep thread prior to retrying request
try {
Thread.sleep(5000);
} catch (Exception e) {
FutureUtils.cancel(future);
}
retryableGetMlTask(workflowId, nodeId, future, taskId, retries + 1, workflowStep);
} else {
logger.error("Failed to retrieve" + workflowStep + ",maximum retries exceeded");
future.completeExceptionally(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}
}));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.transport.deploy.MLDeployModelResponse;
Expand All @@ -27,18 +30,29 @@
/**
* Step to deploy a model
*/
public class DeployModelStep implements WorkflowStep {
public class DeployModelStep extends AbstractRetryableWorkflowStep {
private static final Logger logger = LogManager.getLogger(DeployModelStep.class);

private final MachineLearningNodeClient mlClient;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
static final String NAME = "deploy_model";

/**
* Instantiate this class
* @param settings The OpenSearch settings
* @param clusterService The cluster service
* @param mlClient client to instantiate MLClient
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public DeployModelStep(MachineLearningNodeClient mlClient) {
public DeployModelStep(
Settings settings,
ClusterService clusterService,
MachineLearningNodeClient mlClient,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler
) {
super(settings, clusterService, mlClient, flowFrameworkIndicesHandler);
this.mlClient = mlClient;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

@Override
Expand All @@ -55,13 +69,10 @@ public CompletableFuture<WorkflowData> execute(
@Override
public void onResponse(MLDeployModelResponse mlDeployModelResponse) {
logger.info("Model deployment state {}", mlDeployModelResponse.getStatus());
deployModelFuture.complete(
new WorkflowData(
Map.ofEntries(Map.entry("deploy_model_status", mlDeployModelResponse.getStatus())),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
String taskId = mlDeployModelResponse.getTaskId();

// Attempt to retrieve the model ID
retryableGetMlTask(currentNodeInputs.getWorkflowId(), currentNodeId, deployModelFuture, taskId, 0, "Deploy model");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.flowframework.common.WorkflowResources;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
Expand Down Expand Up @@ -72,7 +73,7 @@ public CompletableFuture<WorkflowData> execute(
@Override
public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse) {
try {
logger.info("Remote Model registration successful");
logger.info("Model group registration successful");
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
Expand Down Expand Up @@ -134,7 +135,7 @@ public void onFailure(Exception e) {
if (description != null) {
builder.description(description);
}
if (!backendRoles.isEmpty()) {
if (!CollectionUtils.isEmpty(backendRoles)) {
builder.backendRoles(backendRoles);
}
if (modelAccessMode != null) {
Expand All @@ -160,6 +161,9 @@ public String getName() {

@SuppressWarnings("unchecked")
private List<String> getBackendRoles(Map<String, Object> content) {
return (List<String>) content.get(BACKEND_ROLES_FIELD);
if (content.containsKey(BACKEND_ROLES_FIELD)) {
return (List<String>) content.get(BACKEND_ROLES_FIELD);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.common.WorkflowResources;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.common.model.MLModelConfig;
import org.opensearch.ml.common.model.MLModelFormat;
import org.opensearch.ml.common.model.TextEmbeddingModelConfig;
Expand All @@ -34,7 +31,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static org.opensearch.flowframework.common.CommonValue.ALL_CONFIG;
import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD;
Expand All @@ -45,7 +41,6 @@
import static org.opensearch.flowframework.common.CommonValue.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.CommonValue.MODEL_TYPE;
import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.URL;
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;

Expand Down Expand Up @@ -75,7 +70,7 @@ public RegisterLocalModelStep(
MachineLearningNodeClient mlClient,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler
) {
super(settings, clusterService);
super(settings, clusterService, mlClient, flowFrameworkIndicesHandler);
this.mlClient = mlClient;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}
Expand All @@ -98,7 +93,14 @@ public void onResponse(MLRegisterModelResponse mlRegisterModelResponse) {
String taskId = mlRegisterModelResponse.getTaskId();

// Attempt to retrieve the model ID
retryableGetMlTask(currentNodeInputs.getWorkflowId(), currentNodeId, registerLocalModelFuture, taskId, 0);
retryableGetMlTask(
currentNodeInputs.getWorkflowId(),
currentNodeId,
registerLocalModelFuture,
taskId,
0,
"Local model registration"
);
}

@Override
Expand Down Expand Up @@ -178,84 +180,4 @@ public void onFailure(Exception e) {
public String getName() {
return NAME;
}

/**
* Retryable get ml task
* @param workflowId the workflow id
* @param nodeId the workflow node id
* @param registerLocalModelFuture the workflow step future
* @param taskId the ml task id
* @param retries the current number of request retries
*/
void retryableGetMlTask(
String workflowId,
String nodeId,
CompletableFuture<WorkflowData> registerLocalModelFuture,
String taskId,
int retries
) {
mlClient.getTask(taskId, ActionListener.wrap(response -> {
MLTaskState currentState = response.getState();
if (currentState != MLTaskState.COMPLETED) {
if (Stream.of(MLTaskState.FAILED, MLTaskState.COMPLETED_WITH_ERROR).anyMatch(x -> x == currentState)) {
// Model registration failed or completed with errors
String errorMessage = "Local model registration failed with error : " + response.getError();
logger.error(errorMessage);
registerLocalModelFuture.completeExceptionally(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
// Task still in progress, attempt retry
throw new IllegalStateException("Local model registration is not yet completed");
}
} else {
try {
logger.info("Local Model registration successful");
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
nodeId,
getName(),
response.getTaskId(),
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
registerLocalModelFuture.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, response.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, response.getState().name())
),
workflowId,
nodeId
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
registerLocalModelFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
registerLocalModelFuture.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}
}
}, exception -> {
if (retries < maxRetry) {
// Sleep thread prior to retrying request
try {
Thread.sleep(5000);
} catch (Exception e) {
FutureUtils.cancel(registerLocalModelFuture);
}
final int retryAdd = retries + 1;
retryableGetMlTask(workflowId, nodeId, registerLocalModelFuture, taskId, retryAdd);
} else {
logger.error("Failed to retrieve local model registration task, maximum retries exceeded");
registerLocalModelFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
}
}));
}
}
Loading

0 comments on commit e0947ee

Please sign in to comment.