diff --git a/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java b/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java index dc322f2836..01b4078984 100644 --- a/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java +++ b/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java @@ -18,6 +18,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import lombok.Setter; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; @@ -68,6 +69,9 @@ public class MLModelAutoReDeployer { private final SearchRequestBuilderFactory searchRequestBuilderFactory; + @Setter + private ActionListener startCronJobListener; + public MLModelAutoReDeployer( ClusterService clusterService, Client client, @@ -126,6 +130,7 @@ Consumer undeployModelsOnDataNodesConsumer() { public void buildAutoReloadArrangement(List addedNodes, String clusterManagerNodeId) { if (!enableAutoReDeployModel) { log.info("Model auto reload configuration is false, not performing auto reloading!"); + startCronjobAndClearListener(); return; } String localNodeId = clusterService.localNode().getId(); @@ -142,10 +147,12 @@ public void buildAutoReloadArrangement(List addedNodes, String clusterMa public void redeployAModel() { if (!enableAutoReDeployModel) { log.info("Model auto reload configuration is false, not performing auto reloading!"); + startCronjobAndClearListener(); return; } if (modelAutoRedeployArrangements.size() == 0) { log.info("No models needs to be auto redeployed!"); + startCronjobAndClearListener(); return; } ModelAutoRedeployArrangement modelAutoRedeployArrangement = modelAutoRedeployArrangements.poll(); @@ -176,9 +183,10 @@ private void triggerAutoDeployModels(List addedNodes) { }); redeployAModel(); } - }, - e -> { log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e); } - ); + }, e -> { + log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e); + startCronjobAndClearListener(); + }); queryRunningModels(listener); } @@ -296,6 +304,14 @@ private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeploy client.execute(MLDeployModelAction.INSTANCE, deployModelRequest, listener); } + private void startCronjobAndClearListener() { + boolean managerNode = clusterService.localNode().isClusterManagerNode(); + if (managerNode && startCronJobListener != null) { + startCronJobListener.onResponse(true); + startCronJobListener = null; + } + } + @Data @Builder static class ModelAutoRedeployArrangement { diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java index 6e98e6414d..a741571187 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java @@ -14,6 +14,7 @@ import org.opensearch.common.lifecycle.LifecycleListener; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer; import org.opensearch.ml.engine.encryptor.Encryptor; import org.opensearch.ml.indices.MLIndicesHandler; @@ -22,7 +23,6 @@ import lombok.extern.log4j.Log4j2; -import java.util.Arrays; import java.util.List; @Log4j2 @@ -70,11 +70,20 @@ public MLCommonsClusterManagerEventListener( @Override public void onClusterManager() { + ActionListener listener = ActionListener.wrap(r -> { + if (syncModelRoutingCron == null) { + startSyncModelRoutingCron(); + } + }, e -> { + if (syncModelRoutingCron == null) { + startSyncModelRoutingCron(); + } + }); + mlModelAutoReDeployer.setStartCronJobListener(listener); String localNodeId = clusterService.localNode().getId(); - mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId); - if (syncModelRoutingCron == null) { - startSyncModelRoutingCron(); - } + threadPool.schedule(() -> mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId), + TimeValue.timeValueSeconds(jobInterval), + GENERAL_THREAD_POOL); } private void startSyncModelRoutingCron() { diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java index 10f8b9502f..afeacaf4cd 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java @@ -119,8 +119,6 @@ public void run() { Set workerNodes = deployingModels.computeIfAbsent(modelId, it -> new HashSet<>()); workerNodes.add(nodeId); } - } else { - } String[] runningDeployModelTaskIds = response.getRunningDeployModelTaskIds();