diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ee376bf..69af39b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,4 +38,4 @@ jobs: with: stack-name: ${{ matrix.stack-name }} python-version: ${{ matrix.python-version }} - ref-zenml: feature/OSS-2609-OSS-2575-model-config-is-model-version + ref-zenml: feature/OSS-2190-data-as-first-class-citizen diff --git a/README.md b/README.md index 16d0250..b3d460d 100644 --- a/README.md +++ b/README.md @@ -182,13 +182,15 @@ To make the most of the Model Control Plane we additionally annotate the output Code snippet 💻 ```python -from zenml.model import ModelArtifactConfig +from zenml import ArtifactConfig experiment_tracker = Client().active_stack.experiment_tracker @step(experiment_tracker=experiment_tracker.name) def model_trainer( ... -) -> Annotated[ClassifierMixin, "model", ModelArtifactConfig()]: +) -> Annotated[ + ClassifierMixin, ArtifactConfig(name="model", is_model_artifact=True) +]: ... ``` @@ -308,7 +310,7 @@ You can follow [Data Validators docs](https://docs.zenml.io/stacks-and-component As a last step concluding all work done so far, we will calculate predictions on the inference dataset and persist them in [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) attached to the current inference model version of the Model Control Plane for reuse and observability. -We will leverage a prepared predictions service called `mlflow_deployment` linked to the inference model version of the Model Control Plane to run `.predict()` and to put predictions as an output of the predictions step, so it is automatically stored in the [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) and linked to the Model Control Plane model version as a versioned artifact link with zero effort. This is achieved because we additionally annotated the `predictions` output with `DataArtifactConfig(overwrite=False)`. This is required to deliver a comprehensive history to stakeholders since Batch Inference can be executed using the same Model Control Plane version multiple times. +We will leverage a prepared predictions service called `mlflow_deployment` linked to the inference model version of the Model Control Plane to run `.predict()` and to put predictions as an output of the predictions step, so it is automatically stored in the [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) and linked to the Model Control Plane model version as a versioned artifact link with zero effort. ``` NOTE: On non-local orchestrators a `model` artifact will be loaded into memory to run predictions directly. You can adapt this part to your needs. @@ -318,12 +320,10 @@ NOTE: On non-local orchestrators a `model` artifact will be loaded into memory t Code snippet 💻 ```python -from zenml.model import DataArtifactConfig - @step def inference_predict( dataset_inf: pd.DataFrame, -) -> Annotated[pd.Series, "predictions", DataArtifactConfig(overwrite=False)]: +) -> Annotated[pd.Series, "predictions"]: model_version = get_step_context().model_version # get predictor diff --git a/template/pipelines/batch_inference.py b/template/pipelines/batch_inference.py index 60d064a..5a36344 100644 --- a/template/pipelines/batch_inference.py +++ b/template/pipelines/batch_inference.py @@ -10,8 +10,7 @@ notify_on_failure, notify_on_success, ) -from zenml import pipeline -from zenml.artifacts.external_artifact import ExternalArtifact +from zenml import ExternalArtifact, pipeline from zenml.integrations.evidently.metrics import EvidentlyMetricConfig from zenml.integrations.evidently.steps import evidently_report_step from zenml.logger import get_logger @@ -32,26 +31,19 @@ def {{product_name}}_batch_inference(): # of one step as the input of the next step. ########## ETL stage ########## df_inference, target, _ = data_loader( - random_state=ExternalArtifact( - model_artifact_pipeline_name="{{product_name}}_training", - model_artifact_name="random_state", - ), + random_state=ExternalArtifact(name="random_state"), is_inference=True ) df_inference = inference_data_preprocessor( dataset_inf=df_inference, - preprocess_pipeline=ExternalArtifact( - model_artifact_name="preprocess_pipeline", - ), + preprocess_pipeline=ExternalArtifact(name="preprocess_pipeline"), target=target, ) {%- if data_quality_checks %} ########## DataQuality stage ########## report, _ = evidently_report_step( - reference_dataset=ExternalArtifact( - model_artifact_name="dataset_trn", - ), + reference_dataset=ExternalArtifact(name="dataset_trn"), comparison_dataset=df_inference, ignored_cols=["target"], metrics=[ diff --git a/template/run.py b/template/run.py index 4f40e7d..60e4439 100644 --- a/template/run.py +++ b/template/run.py @@ -5,7 +5,7 @@ import os from typing import Optional -from zenml.artifacts.external_artifact import ExternalArtifact +from zenml.client import Client from zenml.logger import get_logger from pipelines import {{product_name}}_batch_inference, {{product_name}}_training, {{product_name}}_deployment @@ -192,17 +192,15 @@ def main( ] = f"{{product_name}}_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" {{product_name}}_batch_inference.with_options(**pipeline_args)(**run_args_inference) - artifact = ExternalArtifact( - model_artifact_name="predictions", - model_name="{{ product_name }}", - model_version="{{ target_environment }}", - model_artifact_version=None, # can be skipped - using latest artifact link - ) - logger.info( - "Batch inference pipeline finished successfully! " - "You can find predictions in Artifact Store using ID: " - f"`{str(artifact.get_artifact_id())}`." - ) + client = Client() + model_version = client.get_model_version("e2e_use_case", "staging") + artifact = model_version.get_data_artifact("predictions") + if artifact: + logger.info( + "Batch inference pipeline finished successfully! " + "You can find predictions in Artifact Store using ID: " + f"`{str(artifact.id)}`." + ) if __name__ == "__main__": diff --git a/template/steps/deployment/deployment_deploy.py b/template/steps/deployment/deployment_deploy.py index d157363..9aa6227 100644 --- a/template/steps/deployment/deployment_deploy.py +++ b/template/steps/deployment/deployment_deploy.py @@ -4,14 +4,13 @@ from typing import Optional from typing_extensions import Annotated -from zenml import get_step_context, step +from zenml import ArtifactConfig, get_step_context, step from zenml.client import Client from zenml.integrations.mlflow.services.mlflow_deployment import MLFlowDeploymentService from zenml.integrations.mlflow.steps.mlflow_deployer import ( mlflow_model_registry_deployer_step, ) from zenml.logger import get_logger -from zenml.model import EndpointArtifactConfig logger = get_logger(__name__) @@ -20,8 +19,7 @@ def deployment_deploy() -> ( Annotated[ Optional[MLFlowDeploymentService], - "mlflow_deployment", - EndpointArtifactConfig(), + ArtifactConfig(name="mlflow_deployment", is_endpoint_artifact=True), ] ): """Predictions step. diff --git a/template/steps/etl/inference_data_preprocessor.py b/template/steps/etl/inference_data_preprocessor.py index 649b3ec..f98aa56 100644 --- a/template/steps/etl/inference_data_preprocessor.py +++ b/template/steps/etl/inference_data_preprocessor.py @@ -6,7 +6,6 @@ import pandas as pd from sklearn.pipeline import Pipeline from zenml import step -from zenml.model import DataArtifactConfig @step @@ -14,11 +13,7 @@ def inference_data_preprocessor( dataset_inf: pd.DataFrame, preprocess_pipeline: Pipeline, target: str, -) -> Annotated[ - pd.DataFrame, - "dataset_inf", - DataArtifactConfig(overwrite=False, artifact_name="inference_dataset"), -]: +) -> Annotated[pd.DataFrame, "inference_dataset"]: """Data preprocessor step. This is an example of a data processor step that prepares the data so that diff --git a/template/steps/inference/inference_predict.py b/template/steps/inference/inference_predict.py index 2eeb729..0d3287b 100644 --- a/template/steps/inference/inference_predict.py +++ b/template/steps/inference/inference_predict.py @@ -8,7 +8,6 @@ from zenml import get_step_context, step from zenml.integrations.mlflow.services.mlflow_deployment import MLFlowDeploymentService from zenml.logger import get_logger -from zenml.model import DataArtifactConfig logger = get_logger(__name__) @@ -16,7 +15,7 @@ @step def inference_predict( dataset_inf: pd.DataFrame, -) -> Annotated[pd.Series, "predictions", DataArtifactConfig(overwrite=False)]: +) -> Annotated[pd.Series, "predictions"]: """Predictions step. This is an example of a predictions step that takes the data in and returns @@ -39,9 +38,9 @@ def inference_predict( model_version = get_step_context().model_version # get predictor - predictor_service: Optional[MLFlowDeploymentService] = model_version.get_endpoint_artifact( - "mlflow_deployment" - ).load() + predictor_service: Optional[ + MLFlowDeploymentService + ] = model_version.load_artifact("mlflow_deployment") if predictor_service is not None: # run prediction from service predictions = predictor_service.predict(request=dataset_inf) @@ -51,7 +50,7 @@ def inference_predict( "as the orchestrator is not local." ) # run prediction from memory - predictor = model_version.get_model_artifact("model").load() + predictor = model_version.load_artifact("model") predictions = predictor.predict(dataset_inf) predictions = pd.Series(predictions, name="predicted") diff --git a/template/steps/promotion/{% if metric_compare_promotion %}compute_performance_metrics_on_current_data.py{% endif %} b/template/steps/promotion/{% if metric_compare_promotion %}compute_performance_metrics_on_current_data.py{% endif %} index 2f09014..5af90c6 100644 --- a/template/steps/promotion/{% if metric_compare_promotion %}compute_performance_metrics_on_current_data.py{% endif %} +++ b/template/steps/promotion/{% if metric_compare_promotion %}compute_performance_metrics_on_current_data.py{% endif %} @@ -6,7 +6,7 @@ from typing_extensions import Annotated import pandas as pd from sklearn.metrics import accuracy_score from zenml import step, get_step_context -from zenml.model import ModelVersion +from zenml.model.model_version import ModelVersion from zenml.logger import get_logger logger = get_logger(__name__) @@ -54,8 +54,8 @@ def compute_performance_metrics_on_current_data( else: # Get predictors predictors = { - latest_version_number: latest_version.get_model_artifact("model").load(), - current_version_number: current_version.get_model_artifact("model").load(), + latest_version_number: latest_version.load_artifact("model"), + current_version_number: current_version.load_artifact("model"), } metrics = {} diff --git a/template/steps/promotion/{% if metric_compare_promotion %}promote_with_metric_compare.py{% endif %} b/template/steps/promotion/{% if metric_compare_promotion %}promote_with_metric_compare.py{% endif %} index 7c6ac3d..712c420 100644 --- a/template/steps/promotion/{% if metric_compare_promotion %}promote_with_metric_compare.py{% endif %} +++ b/template/steps/promotion/{% if metric_compare_promotion %}promote_with_metric_compare.py{% endif %} @@ -1,7 +1,7 @@ # {% include 'template/license_header' %} from zenml import get_step_context, step -from zenml.model import ModelVersion +from zenml.model.model_version import ModelVersion from zenml.logger import get_logger from utils import promote_in_model_registry diff --git a/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_version.py{% endif %} b/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_version.py{% endif %} index 776c6eb..9b8273f 100644 --- a/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_version.py{% endif %} +++ b/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_version.py{% endif %} @@ -1,7 +1,7 @@ # {% include 'template/license_header' %} from zenml import get_step_context, step -from zenml.model import ModelVersion +from zenml.model.model_version import ModelVersion from zenml.logger import get_logger from utils import promote_in_model_registry diff --git a/template/steps/training/model_trainer.py b/template/steps/training/model_trainer.py index d992c72..39f7926 100644 --- a/template/steps/training/model_trainer.py +++ b/template/steps/training/model_trainer.py @@ -5,12 +5,11 @@ import mlflow import pandas as pd from sklearn.base import ClassifierMixin -from zenml import log_artifact_metadata, step +from zenml import ArtifactConfig, log_artifact_metadata, step from zenml.client import Client from zenml.integrations.mlflow.experiment_trackers import MLFlowExperimentTracker from zenml.integrations.mlflow.steps.mlflow_registry import mlflow_register_model_step from zenml.logger import get_logger -from zenml.model import ModelArtifactConfig logger = get_logger(__name__) @@ -31,7 +30,9 @@ def model_trainer( model: ClassifierMixin, target: str, name: str, -) -> Annotated[ClassifierMixin, "model", ModelArtifactConfig()]: +) -> Annotated[ + ClassifierMixin, ArtifactConfig(name="model", is_model_artifact=True) +]: """Configure and train a model on the training dataset. This is an example of a model training step that takes in a dataset artifact @@ -79,12 +80,14 @@ def model_trainer( name=name, ) # keep track of mlflow version for future use - log_artifact_metadata( - output_name="model", - model_registry_version=Client() - .active_stack.model_registry.list_model_versions(name=name)[-1] - .version, - ) + model_registry = Client().active_stack.model_registry + if model_registry: + versions = model_registry.list_model_versions(name=name) + if versions: + log_artifact_metadata( + metadata={"model_registry_version": versions[-1].version}, + artifact_name="model", + ) ### YOUR CODE ENDS HERE ### return model diff --git a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py index c306ce0..8550f89 100644 --- a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py +++ b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py @@ -31,9 +31,7 @@ def hp_tuning_select_best_model( best_metric = -1 # consume artifacts attached to current model version in Model Control Plane for step_name in step_names: - hp_output = model_version.get_data_artifact( - step_name=step_name, name="hp_result" - ) + hp_output = model_version.get_data_artifact("hp_result") model: ClassifierMixin = hp_output.load() # fetch metadata we attached earlier metric = float(hp_output.run_metadata["metric"].value) diff --git a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py index 6e2316a..d8290f1 100644 --- a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py +++ b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py @@ -80,8 +80,8 @@ def hp_tuning_single_search( score = accuracy_score(y_tst, y_pred) # log score along with output artifact as metadata log_artifact_metadata( - output_name="hp_result", - metric=float(score), + metadata={"metric": float(score)}, + artifact_name="hp_result", ) ### YOUR CODE ENDS HERE ### return cv.best_estimator_