Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.17] Fix PR #2976 bug due to missing adding function_name and algorithm in querying models #3114

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private void triggerUndeployModelsOnDataNodes(List<String> dataNodeIds) {
client.execute(MLUndeployModelAction.INSTANCE, undeployModelNodesRequest, undeployModelListener);
}
}
}, e -> { log.error("Failed to query need undeploy models, no action will be performed"); });
}, e -> { log.error("Failed to query need undeploy models, no action will be performed", e); });
queryRunningModels(listener);
}

Expand All @@ -241,7 +241,9 @@ private void queryRunningModels(ActionListener<SearchResponse> listener) {
String[] includes = new String[] {
MLModel.AUTO_REDEPLOY_RETRY_TIMES_FIELD,
MLModel.PLANNING_WORKER_NODES_FIELD,
MLModel.DEPLOY_TO_ALL_NODES_FIELD };
MLModel.DEPLOY_TO_ALL_NODES_FIELD,
MLModel.FUNCTION_NAME_FIELD,
MLModel.ALGORITHM_FIELD };

String[] excludes = new String[] { MLModel.MODEL_CONTENT_FIELD, MLModel.OLD_MODEL_CONTENT_FIELD };
FetchSourceContext fetchContext = new FetchSourceContext(true, includes, excludes);
Expand All @@ -257,12 +259,24 @@ private void queryRunningModels(ActionListener<SearchResponse> listener) {

@SuppressWarnings("unchecked")
private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeployArrangement) {
if (modelAutoRedeployArrangement == null) {
log.info("No more models in arrangement, skipping the redeployment");
return;
}
String modelId = modelAutoRedeployArrangement.getSearchResponse().getId();
List<String> addedNodes = modelAutoRedeployArrangement.getAddedNodes();
Map<String, Object> sourceAsMap = modelAutoRedeployArrangement.getSearchResponse().getSourceAsMap();
String functionName = (String) Optional
.ofNullable(sourceAsMap.get(MLModel.FUNCTION_NAME_FIELD))
.orElse(sourceAsMap.get(MLModel.ALGORITHM_FIELD));
if (functionName == null) {
log
.error(
"Model function_name or algorithm is null, model is not in correct status, please check the model, model id is: {}",
modelId
);
return;
}
if (FunctionName.REMOTE == FunctionName.from(functionName)) {
log.info("Skipping redeploying remote model {} as remote model deployment can be done at prediction time.", modelId);
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.ml.autoredeploy;

import static org.opensearch.ml.common.MLTask.MODEL_ID_FIELD;

import java.nio.file.Files;
import java.nio.file.Path;

import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.rest.MLCommonsRestTestCase;

import lombok.SneakyThrows;

public class MLModelAutoReDeployerIT extends MLCommonsRestTestCase {

public void testModelAutoRedeploy() {
prepareModel();
}

@SneakyThrows
private void prepareModel() {
String requestBody = Files
.readString(
Path.of(this.getClass().getClassLoader().getResource("org/opensearch/ml/autoredeploy/TracedSmallModelRequest.json").toURI())
);
String registerFirstModelTaskId = registerModel(requestBody);
String registerSecondModelTaskId = registerModel(requestBody);
waitForTask(registerFirstModelTaskId, MLTaskState.COMPLETED);
getTask(client(), registerFirstModelTaskId, response -> {
String firstModelId = (String) response.get(MODEL_ID_FIELD);
try {
String deployFirstModelTaskId = deployModel(firstModelId);
getTask(client(), registerSecondModelTaskId, response1 -> {
String secondModelId = (String) response1.get(MODEL_ID_FIELD);
try {
/**
* At this time point, the model auto redeployer should be querying the deploying/deploy failed/partially deployed models.
* The original deploy model task should be able to complete successfully, if not it means the
* org.opensearch.ml.action.forward.TransportForwardAction.triggerNextModelDeployAndCheckIfRestRetryTimes might throw exception
* which cause by org.opensearch.ml.autoredeploy.MLModelAutoReDeployer#redeployAModel. The auto redeploy constructs an arrangement
* with two models, the first model deploy done event will trigger the auto redeploy's next model deploy, and if during this
* any error occurs, the first model deploy task status won't be updated to complete. So if this IT can pass, then it means the
* next model auto redeploy trigger is correct.
*/
String deploySecondModelTaskId = deployModel(secondModelId);
waitForTask(deploySecondModelTaskId, MLTaskState.COMPLETED);
} catch (Exception e) {
fail(e.getMessage());
}
});
waitForTask(deployFirstModelTaskId, MLTaskState.COMPLETED);
} catch (Exception e) {
logger.error(e.getMessage(), e);
fail(e.getMessage());
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "traced_small_model",
"version": "1.0.0",
"model_format": "TORCH_SCRIPT",
"model_task_type": "text_embedding",
"model_content_hash_value": "e13b74006290a9d0f58c1376f9629d4ebc05a0f9385f40db837452b167ae9021",
"model_config": {
"model_type": "bert",
"embedding_dimension": 768,
"framework_type": "sentence_transformers",
"all_config": "{\"architectures\":[\"BertModel\"],\"max_position_embeddings\":512,\"model_type\":\"bert\",\"num_attention_heads\":12,\"num_hidden_layers\":6}"
},
"url": "https://github.com/opensearch-project/ml-commons/blob/2.x/ml-algorithms/src/test/resources/org/opensearch/ml/engine/algorithms/text_embedding/traced_small_model.zip?raw=true"
}
Loading