diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4d0b1e9..ee376bf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,4 +38,4 @@ jobs: with: stack-name: ${{ matrix.stack-name }} python-version: ${{ matrix.python-version }} - ref-zenml: develop + ref-zenml: feature/OSS-2609-OSS-2575-model-config-is-model-version diff --git a/README.md b/README.md index f991418..50e1700 100644 --- a/README.md +++ b/README.md @@ -94,10 +94,8 @@ We will be going section by section diving into implementation details and shari Training pipeline is designed to create a new Model Control Plane version and promote it to inference stage upon successfully passing the quality assurance at the end of the pipeline. This ensures that we always infer only on quality-assured Model Control Plane version and provides a seamless integration of required artifacts of this Model Control Plane version later on inference runs. This is achieved by providing this configuration in `train_config.yaml` used to configure our pipeline: ```yaml -model_config: +model_version: name: your_product_name - ... - create_new_model_version: true ``` ### [Continuous Training] Training Pipeline: ETL steps @@ -145,20 +143,20 @@ After the steps are executed we need to collect results (one best model per each ```python from zenml import get_step_context -model_version = get_step_context().model_config._get_model_version() +model_version = get_step_context().model_version best_model = None best_metric = -1 # consume artifacts attached to current model version in Model Control Plane -for full_artifact_name in model_version.artifact_object_ids: - # if artifacts comes from one of HP tuning steps - if full_artifact_name.endswith("hp_result"): - hp_output = model_version.artifacts[full_artifact_name]["1"] - model: ClassifierMixin = hp_output.load() - # fetch metadata we attached earlier - metric = float(hp_output.metadata["metric"].value) - if best_model is None or best_metric < metric: - best_model = model +for step_name in step_names: + hp_output = model_version.get_data_artifact( + step_name=step_name, name="hp_result" + ) + model: ClassifierMixin = hp_output.load() + # fetch metadata we attached earlier + metric = float(hp_output.metadata["metric"].value) + if best_model is None or best_metric < metric: + best_model = model ``` @@ -239,7 +237,7 @@ By doing so we ensure that the best-performing version will be used for inferenc The Deployment pipeline is designed to run with inference Model Control Plane version context. This ensures that we always infer only on quality-assured Model Control Plane version and provide seamless integration of required artifacts created during training of this Model Control Plane version. This is achieved by providing this configuration in `deployer_config.yaml` used to configure our pipeline: ```yaml -model_config: +model_version: name: your_product_name version: production ``` @@ -259,7 +257,7 @@ NOTE: In this template a prediction service is only created for local orchestrat The Batch Inference pipeline is designed to run with inference Model Control Plane version context. This ensures that we always infer only on quality-assured Model Control Plane version and provide seamless integration of required artifacts created during training of this Model Control Plane version. This is achieved by providing this configuration in `inference_config.yaml` used to configure our pipeline: ```yaml -model_config: +model_version: name: your_product_name version: production ``` @@ -324,10 +322,22 @@ from zenml.model import ArtifactConfig @step def inference_predict( - deployment_service: MLFlowDeploymentService, dataset_inf: pd.DataFrame, ) -> Annotated[pd.Series, "predictions", ArtifactConfig(overwrite=False)]: - predictions = deployment_service.predict(request=dataset_inf) + model_version = get_step_context().model_version + + # get predictor + predictor_service: Optional[MLFlowDeploymentService] = model_version.get_endpoint_artifact( + "mlflow_deployment" + ).load() + if predictor_service is not None: + # run prediction from service + predictions = predictor_service.predict(request=dataset_inf) + else: + # run prediction from memory + predictor = model_version.get_model_artifact("model").load() + predictions = predictor.predict(dataset_inf) + predictions = pd.Series(predictions, name="predicted") return predictions ``` diff --git a/template/configs/deployer_config.yaml b/template/configs/deployer_config.yaml index d152dde..0cd227e 100644 --- a/template/configs/deployer_config.yaml +++ b/template/configs/deployer_config.yaml @@ -21,7 +21,7 @@ steps: notify_on_success: False # configuration of the Model Control Plane -model_config: +model_version: name: {{ product_name }} version: {{ target_environment }} diff --git a/template/configs/inference_config.yaml b/template/configs/inference_config.yaml index d152dde..0cd227e 100644 --- a/template/configs/inference_config.yaml +++ b/template/configs/inference_config.yaml @@ -21,7 +21,7 @@ steps: notify_on_success: False # configuration of the Model Control Plane -model_config: +model_version: name: {{ product_name }} version: {{ target_environment }} diff --git a/template/configs/train_config.yaml b/template/configs/train_config.yaml index 517812d..5a946db 100644 --- a/template/configs/train_config.yaml +++ b/template/configs/train_config.yaml @@ -35,7 +35,7 @@ steps: notify_on_success: False # configuration of the Model Control Plane -model_config: +model_version: name: {{ product_name }} license: {{ open_source_license }} description: {{ product_name }} E2E Batch Use Case @@ -52,7 +52,6 @@ model_config: - sklearn - from template - ZenML delivered - create_new_model_version: true # pipeline level extra configurations extra: diff --git a/template/pipelines/batch_inference.py b/template/pipelines/batch_inference.py index 56ddf6d..60d064a 100644 --- a/template/pipelines/batch_inference.py +++ b/template/pipelines/batch_inference.py @@ -11,10 +11,10 @@ notify_on_success, ) from zenml import pipeline +from zenml.artifacts.external_artifact import ExternalArtifact from zenml.integrations.evidently.metrics import EvidentlyMetricConfig from zenml.integrations.evidently.steps import evidently_report_step from zenml.logger import get_logger -from zenml.artifacts.external_artifact import ExternalArtifact logger = get_logger(__name__) diff --git a/template/pipelines/deployment.py b/template/pipelines/deployment.py index aa4a21c..3101aa8 100644 --- a/template/pipelines/deployment.py +++ b/template/pipelines/deployment.py @@ -1,7 +1,6 @@ # {% include 'template/license_header' %} -from steps import deployment_deploy,notify_on_success,notify_on_failure - +from steps import deployment_deploy, notify_on_failure, notify_on_success from zenml import pipeline diff --git a/template/pipelines/training.py b/template/pipelines/training.py index b89eb96..bd1a647 100644 --- a/template/pipelines/training.py +++ b/template/pipelines/training.py @@ -98,7 +98,7 @@ def {{product_name}}_training( target=target, ) after.append(step_name) - best_model = hp_tuning_select_best_model(after=after) + best_model = hp_tuning_select_best_model(step_names=after, after=after) {%- else %} model_configuration = pipeline_extra["model_configuration"] best_model = get_model_from_config( diff --git a/template/steps/deployment/deployment_deploy.py b/template/steps/deployment/deployment_deploy.py index fd5d0d4..edfb7d0 100644 --- a/template/steps/deployment/deployment_deploy.py +++ b/template/steps/deployment/deployment_deploy.py @@ -2,19 +2,16 @@ from typing import Optional -from typing_extensions import Annotated -from zenml import step, get_step_context +from typing_extensions import Annotated +from zenml import get_step_context, step from zenml.client import Client -from zenml.logger import get_logger -from zenml.model import DeploymentArtifactConfig +from zenml.integrations.mlflow.services.mlflow_deployment import MLFlowDeploymentService from zenml.integrations.mlflow.steps.mlflow_deployer import ( mlflow_model_registry_deployer_step, ) -from zenml.integrations.mlflow.services.mlflow_deployment import ( - MLFlowDeploymentService, -) -from utils import get_model_registry_version +from zenml.logger import get_logger +from zenml.model import DeploymentArtifactConfig logger = get_logger(__name__) @@ -47,12 +44,14 @@ def deployment_deploy() -> ( """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### if Client().active_stack.orchestrator.flavor == "local": - model_version = get_step_context().model_config._get_model_version() + model_version = get_step_context().model_version # deploy predictor service deployment_service = mlflow_model_registry_deployer_step.entrypoint( - registry_model_name=model_version.model.name, - registry_model_version=get_model_registry_version(model_version), + registry_model_name=model_version.name, + registry_model_version=model_version.get_model_artifact("model") + .metadata["model_registry_version"] + .value, replace_existing=True, ) else: diff --git a/template/steps/etl/data_loader.py b/template/steps/etl/data_loader.py index 5fdc7c3..97861ee 100644 --- a/template/steps/etl/data_loader.py +++ b/template/steps/etl/data_loader.py @@ -2,10 +2,10 @@ from typing import Tuple -from typing_extensions import Annotated import pandas as pd from sklearn.datasets import load_breast_cancer +from typing_extensions import Annotated from zenml import step from zenml.logger import get_logger diff --git a/template/steps/inference/inference_predict.py b/template/steps/inference/inference_predict.py index d92e4de..190da25 100644 --- a/template/steps/inference/inference_predict.py +++ b/template/steps/inference/inference_predict.py @@ -5,12 +5,10 @@ from typing_extensions import Annotated import pandas as pd -from zenml import step, get_step_context +from zenml import get_step_context, step +from zenml.integrations.mlflow.services.mlflow_deployment import MLFlowDeploymentService from zenml.logger import get_logger from zenml.model import ArtifactConfig -from zenml.integrations.mlflow.services.mlflow_deployment import ( - MLFlowDeploymentService, -) logger = get_logger(__name__) @@ -38,10 +36,10 @@ def inference_predict( The predictions as pandas series """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - model_version = get_step_context().model_config._get_model_version() + model_version = get_step_context().model_version # get predictor - predictor_service: Optional[MLFlowDeploymentService] = model_version.get_deployment( + predictor_service: Optional[MLFlowDeploymentService] = model_version.get_endpoint_artifact( "mlflow_deployment" ).load() if predictor_service is not None: @@ -53,7 +51,7 @@ def inference_predict( "as the orchestrator is not local." ) # run prediction from memory - predictor = model_version.get_model_object("model").load() + predictor = model_version.get_model_artifact("model").load() predictions = predictor.predict(dataset_inf) predictions = pd.Series(predictions, name="predicted") diff --git a/template/steps/promotion/{% if metric_compare_promotion %}compute_performance_metrics_on_current_data.py{% endif %} b/template/steps/promotion/{% if metric_compare_promotion %}compute_performance_metrics_on_current_data.py{% endif %} index 6bb12e9..2f09014 100644 --- a/template/steps/promotion/{% if metric_compare_promotion %}compute_performance_metrics_on_current_data.py{% endif %} +++ b/template/steps/promotion/{% if metric_compare_promotion %}compute_performance_metrics_on_current_data.py{% endif %} @@ -5,11 +5,10 @@ from typing_extensions import Annotated import pandas as pd from sklearn.metrics import accuracy_score -from zenml import step +from zenml import step, get_step_context +from zenml.model import ModelVersion from zenml.logger import get_logger -from utils import get_model_versions - logger = get_logger(__name__) @step @@ -43,21 +42,27 @@ def compute_performance_metrics_on_current_data( logger.info("Evaluating model metrics...") # Get model version numbers from Model Control Plane - latest_version, current_version = get_model_versions(target_env) + latest_version = get_step_context().model_version + current_version = ModelVersion(name=latest_version.name, version=target_env) + + latest_version_number = latest_version.number + current_version_number = current_version.number - # Get predictors - predictors = { - latest_version.number: latest_version.get_model_object("model").load(), - current_version.number: current_version.get_model_object("model").load(), - } + if current_version_number is None: + current_version_number = -1 + metrics = {latest_version_number:1.0,current_version_number:0.0} + else: + # Get predictors + predictors = { + latest_version_number: latest_version.get_model_artifact("model").load(), + current_version_number: current_version.get_model_artifact("model").load(), + } - if latest_version != current_version: metrics = {} - for version in [latest_version.number,current_version.number]: + for version in [latest_version_number,current_version_number]: # predict and evaluate predictions = predictors[version].predict(X) metrics[version]=accuracy_score(y, predictions) - else: - metrics = {latest_version.number:1.0,current_version.number:0.0} + ### YOUR CODE ENDS HERE ### - return metrics[latest_version.number],metrics[current_version.number] + return metrics[latest_version_number],metrics[current_version_number] diff --git a/template/steps/promotion/{% if metric_compare_promotion %}promote_with_metric_compare.py{% endif %} b/template/steps/promotion/{% if metric_compare_promotion %}promote_with_metric_compare.py{% endif %} index 9e9dc73..7e31b9e 100644 --- a/template/steps/promotion/{% if metric_compare_promotion %}promote_with_metric_compare.py{% endif %} +++ b/template/steps/promotion/{% if metric_compare_promotion %}promote_with_metric_compare.py{% endif %} @@ -1,9 +1,10 @@ # {% include 'template/license_header' %} from zenml import get_step_context, step +from zenml.model import ModelVersion from zenml.logger import get_logger -from utils import get_model_versions, promote_in_model_registry, get_model_registry_version +from utils import promote_in_model_registry logger = get_logger(__name__) @@ -43,9 +44,13 @@ def promote_with_metric_compare( should_promote = True # Get model version numbers from Model Control Plane - latest_version, current_version = get_model_versions(target_env) + latest_version = get_step_context().model_version + current_version = ModelVersion(name=latest_version.name, version=target_env) - if latest_version.number == current_version.number: + latest_version_number = latest_version.number + current_version_number = current_version.number + + if current_version_number is None: logger.info("No current model version found - promoting latest") else: logger.info( @@ -62,21 +67,28 @@ def promote_with_metric_compare( ) should_promote = False - promoted_version = get_model_registry_version(current_version) + if should_promote: # Promote in Model Control Plane - model_version = get_step_context().model_config._get_model_version() + model_version = get_step_context().model_version model_version.set_stage(stage=target_env, force=True) logger.info(f"Current model version was promoted to '{target_env}'.") # Promote in Model Registry + latest_version_model_registry_number = latest_version.get_model_artifact("model").metadata["model_registry_version"].value + if current_version_number is None: + current_version_model_registry_number = latest_version_model_registry_number + else: + current_version_model_registry_number = current_version.get_model_artifact("model").metadata["model_registry_version"].value promote_in_model_registry( - latest_version=get_model_registry_version(latest_version), - current_version=get_model_registry_version(current_version), + latest_version=latest_version_model_registry_number, + current_version=current_version_model_registry_number, model_name=mlflow_model_name, target_env=target_env.capitalize(), ) - promoted_version = get_model_registry_version(latest_version) + promoted_version = latest_version_model_registry_number + else: + promoted_version = current_version.get_model_artifact("model").metadata["model_registry_version"].value logger.info( f"Current model version in `{target_env}` is `{promoted_version}` registered in Model Registry" diff --git a/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_version.py{% endif %} b/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_version.py{% endif %} index 17299a1..0cad25d 100644 --- a/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_version.py{% endif %} +++ b/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_version.py{% endif %} @@ -1,9 +1,10 @@ # {% include 'template/license_header' %} from zenml import get_step_context, step +from zenml.model import ModelVersion from zenml.logger import get_logger -from utils import get_model_versions, promote_in_model_registry, get_model_registry_version +from utils import promote_in_model_registry logger = get_logger(__name__) @@ -21,18 +22,24 @@ def promote_latest_version( ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### # Get model version numbers from Model Control Plane - latest_version, current_version = get_model_versions(target_env) + latest_version = get_step_context().model_version + current_version = ModelVersion(name=latest_version.name, version=target_env) logger.info(f"Promoting latest model version `{latest_version}`") # Promote in Model Control Plane - model_version = get_step_context().model_config._get_model_version() + model_version = get_step_context().model_version model_version.set_stage(stage=target_env, force=True) logger.info(f"Current model version was promoted to '{target_env}'.") # Promote in Model Registry + latest_version_model_registry_number = latest_version.get_model_artifact("model").metadata["model_registry_version"].value + if current_version.number is None: + current_version_model_registry_number = latest_version_model_registry_number + else: + current_version_model_registry_number = current_version.get_model_artifact("model").metadata["model_registry_version"].value promote_in_model_registry( - latest_version=get_model_registry_version(latest_version), - current_version=get_model_registry_version(current_version), + latest_version=latest_version_model_registry_number, + current_version=current_version_model_registry_number, model_name=mlflow_model_name, target_env=target_env.capitalize(), ) diff --git a/template/steps/training/model_trainer.py b/template/steps/training/model_trainer.py index 9896165..d992c72 100644 --- a/template/steps/training/model_trainer.py +++ b/template/steps/training/model_trainer.py @@ -1,17 +1,16 @@ # {% include 'template/license_header' %} - from typing_extensions import Annotated import mlflow import pandas as pd from sklearn.base import ClassifierMixin -from zenml import step, log_artifact_metadata +from zenml import log_artifact_metadata, step from zenml.client import Client from zenml.integrations.mlflow.experiment_trackers import MLFlowExperimentTracker +from zenml.integrations.mlflow.steps.mlflow_registry import mlflow_register_model_step from zenml.logger import get_logger from zenml.model import ModelArtifactConfig -from zenml.integrations.mlflow.steps.mlflow_registry import mlflow_register_model_step logger = get_logger(__name__) diff --git a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py index ea78e7f..a5d5b72 100644 --- a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py +++ b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py @@ -1,10 +1,10 @@ # {% include 'template/license_header' %} +from typing import List from typing_extensions import Annotated from sklearn.base import ClassifierMixin - from zenml import get_step_context, step from zenml.logger import get_logger @@ -12,7 +12,9 @@ @step -def hp_tuning_select_best_model() -> Annotated[ClassifierMixin, "best_model"]: +def hp_tuning_select_best_model( + step_names: List[str], +) -> Annotated[ClassifierMixin, "best_model"]: """Find best model across all HP tuning attempts. This is an example of a model hyperparameter tuning step that loops @@ -23,19 +25,19 @@ def hp_tuning_select_best_model() -> Annotated[ClassifierMixin, "best_model"]: The best possible model class and its' parameters. """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - model_version = get_step_context().model_config._get_model_version() + model_version = get_step_context().model_version best_model = None best_metric = -1 # consume artifacts attached to current model version in Model Control Plane - for full_artifact_name in model_version.artifact_object_ids: - # if artifacts comes from one of HP tuning steps - if full_artifact_name.endswith("hp_result"): - hp_output = model_version.artifacts[full_artifact_name]["1"] - model: ClassifierMixin = hp_output.load() - # fetch metadata we attached earlier - metric = float(hp_output.metadata["metric"].value) - if best_model is None or best_metric < metric: - best_model = model + for step_name in step_names: + hp_output = model_version.get_data_artifact( + step_name=step_name, name="hp_result" + ) + model: ClassifierMixin = hp_output.load() + # fetch metadata we attached earlier + metric = float(hp_output.metadata["metric"].value) + if best_model is None or best_metric < metric: + best_model = model ### YOUR CODE ENDS HERE ### return best_model diff --git a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py index e88ed42..6e2316a 100644 --- a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py +++ b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py @@ -8,10 +8,9 @@ from sklearn.base import ClassifierMixin from sklearn.metrics import accuracy_score from sklearn.model_selection import RandomizedSearchCV -from zenml import step, log_artifact_metadata -from zenml.logger import get_logger - from utils import get_model_from_config +from zenml import log_artifact_metadata, step +from zenml.logger import get_logger logger = get_logger(__name__) diff --git a/template/utils/__init__.py b/template/utils/__init__.py index ca47d21..1c99e51 100644 --- a/template/utils/__init__.py +++ b/template/utils/__init__.py @@ -2,5 +2,4 @@ from .get_model_from_config import get_model_from_config -from .model_versions import get_model_versions, get_model_registry_version from .promote_in_model_registry import promote_in_model_registry diff --git a/template/utils/model_versions.py b/template/utils/model_versions.py deleted file mode 100644 index 083926b..0000000 --- a/template/utils/model_versions.py +++ /dev/null @@ -1,43 +0,0 @@ -# {% include 'template/license_header' %} - -from typing import Tuple -from typing_extensions import Annotated -from zenml import get_step_context -from zenml.model import ModelConfig -from zenml.models.model_models import ModelVersionResponseModel - - -def get_model_versions( - target_env: str, -) -> Tuple[ - Annotated[ModelVersionResponseModel, "latest_version"], - Annotated[ModelVersionResponseModel, "current_version"], -]: - """Get latest and currently promoted model versions from Model Control Plane. - - Args: - target_env: Target stage to search for currently promoted version - - Returns: - Latest and currently promoted model versions from the Model Control Plane - """ - latest_version = get_step_context().model_config._get_model_version() - try: - current_version = ModelConfig( - name=latest_version.model.name, version=target_env - )._get_model_version() - except KeyError: - current_version = latest_version - - return latest_version, current_version - - -def get_model_registry_version(model_version: ModelVersionResponseModel): - """Get model version in model registry from metadata of a model in the Model Control Plane. - - Args: - model_version: the Model Control Plane version response - """ - return ( - model_version.get_model_object("model").metadata["model_registry_version"].value - )