diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6fe0ba6..4d0b1e9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,6 +27,8 @@ jobs: ZENML_DEBUG: true ZENML_ANALYTICS_OPT_IN: false ZENML_LOGGING_VERBOSITY: INFO + # fork fix for macos + OBJC_DISABLE_INITIALIZE_FORK_SAFETY: YES steps: - name: Check out repository code uses: actions/checkout@v3 diff --git a/.github/workflows/image-optimizer.yml b/.github/workflows/image-optimizer.yml new file mode 100644 index 0000000..dddbd1e --- /dev/null +++ b/.github/workflows/image-optimizer.yml @@ -0,0 +1,26 @@ +name: Compress Images +on: + pull_request: + # Run Image Actions when JPG, JPEG, PNG or WebP files are added or changed. + # See https://help.github.com/en/actions/automating-your-workflow-with-github-actions/workflow-syntax-for-github-actions#onpushpull_requestpaths for reference. + paths: + - '**.jpg' + - '**.jpeg' + - '**.png' + - '**.webp' +jobs: + build: + # Only run on non-draft PRs within the same repository. + if: github.event.pull_request.head.repo.full_name == github.repository && github.event.pull_request.draft == false + name: calibreapp/image-actions + runs-on: ubuntu-latest + steps: + - name: Checkout Repo + uses: actions/checkout@v3 + + - name: Compress Images + uses: calibreapp/image-actions@main + with: + # The `GITHUB_TOKEN` is automatically generated by GitHub and scoped only to the repository that is currently running the action. By default, the action can’t update Pull Requests initiated from forked repositories. + # See https://docs.github.com/en/actions/reference/authentication-in-a-workflow and https://help.github.com/en/articles/virtual-environments-for-github-actions#token-permissions + githubToken: ${{ secrets.GITHUB_TOKEN }} diff --git a/README.md b/README.md index 95fd681..f991418 100644 --- a/README.md +++ b/README.md @@ -7,24 +7,26 @@ to top it all off, a simple but useful CLI. This is exactly what the ZenML templates are all about. This project template is a great starting point for anyone looking to deepen their knowledge of ZenML. -It consists of two pipelines with the following high-level setup: +It consists of three pipelines with the following high-level setup:

- +

-Both pipelines are inside a shared Model Control Plane model context - training pipeline creates and promotes new Model Control Plane version and inference pipeline is reading from inference Model Control Plane version. This makes those pipelines closely connected, while ensuring that only quality assured Model Control Plane versions are used to produce predictions delivered to stakeholders. +All pipelines are leveraging the Model Control Plane to bring all parts together - the training pipeline creates and promotes a new Model Control Plane version with a trained model object in it, deployment pipeline uses the inference Model Control Plane version (the one promoted during training) to create a deployment service and inference pipeline using deployment service from the inference Model Control Plane version and store back new set of predictions as a versioned data artifact for future use. This makes those pipelines closely connected while ensuring that only quality-assured Model Control Plane versions are used to produce predictions delivered to stakeholders. * [CT] Training * Load, split, and preprocess the training dataset * Search for an optimal model object architecture and tune its hyperparameters * Train the model object and evaluate its performance on the holdout set * Compare a recently trained model object with one promoted earlier - * If a recently trained model object performs better, then stage it as a new inference model object in the model registry - * On success of the current model object, then stage newly created Model Control Plane version as the one used for inference + * If a recently trained model object performs better - stage it as a new inference model object in model registry + * On success of the current model object - stage newly created Model Control Plane version as the one used for inference +* [CD] Deployment + * Deploy a new prediction service based on the model object connected to the inference Model Control Plane version. * [CD] Batch Inference * Load the inference dataset and preprocess it reusing object fitted during training * Perform data drift analysis reusing training dataset of the inference Model Control Plane version as a reference * Run predictions using a model object from the inference Model Control Plane version - * Store predictions as a versioned artifact and link it to the inference Model Control Plane version + * Store predictions as an versioned artifact and link it to the inference Model Control Plane version It showcases the core ZenML concepts for supervised ML with batch predictions: @@ -102,7 +104,7 @@ model_config: [πŸ“‚ Code folder](template/steps/etl/)

- +

Usually at the very beginning of every training pipeline developers are acquiring data to work with in later stages. In this example, we are using [the Breast Cancer Dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_breast_cancer.html) to showcase steps but avoid high computational costs. @@ -115,12 +117,55 @@ We also output `preprocess_pipeline` as an output artifact from `train_data_prep [πŸ“‚ Code folder](template/steps/%7B%25%20if%20hyperparameters_tuning%20%25%7Dhp_tuning%7B%25%20endif%20%25%7D)

- +

To ensure the high quality of ML models many ML Engineers go for automated hyperparameter tuning or even automated model architecture search. In this example, we are using prepared data from ETL to spin up a search of the best model parameters for different architectures in parallel. -To create parallel processing of computationally expensive operations we use a loop over predefined potential architectures and respective parameters search grid and create one step for each candidate. After the steps are created we need to collect results (one best model per each search step) in a `hp_tuning_select_best_model` step to define the final winner and pass it to training. To ensure that collection goes smoothly and in full we use an `after` statement populated with all search steps names, so the selector job will wait for the completion of all searches. +To create parallel processing of computationally expensive operations we use a loop over predefined potential architectures and respective parameters search grid and create one step for each candidate. Inside each hyperparameter tuning step instance, we run random search cross-validation to find the best parameters and after that evaluate the result using the metric of interest (accuracy_score in this example). We attach a computed metric to the output artifact as metadata to be used later in `hp_tuning_select_best_model`. +
+ Code snippet πŸ’» + +```python +from zenml import log_artifact_metadata + +score = accuracy_score(y_tst, y_pred) +# log score along with output artifact as metadata +log_artifact_metadata( + output_name="hp_result", + metric=float(score), +) +``` +
+ +After the steps are executed we need to collect results (one best model per each search step) in a `hp_tuning_select_best_model` step to define the final winner and pass it to training. We use the Model Control Plane capabilities to pull correct artifacts from previous steps and fetch their metadata for final evaluation before actual training. +
+ Code snippet πŸ’» + +```python +from zenml import get_step_context + +model_version = get_step_context().model_config._get_model_version() + +best_model = None +best_metric = -1 +# consume artifacts attached to current model version in Model Control Plane +for full_artifact_name in model_version.artifact_object_ids: + # if artifacts comes from one of HP tuning steps + if full_artifact_name.endswith("hp_result"): + hp_output = model_version.artifacts[full_artifact_name]["1"] + model: ClassifierMixin = hp_output.load() + # fetch metadata we attached earlier + metric = float(hp_output.metadata["metric"].value) + if best_model is None or best_metric < metric: + best_model = model +``` +
+ + +To ensure that collection goes smoothly and in full we use an `after` statement populated with all search steps names, so the selector job will wait for the completion of all searches. + + You can find more information about the current state of [Hyperparameter Tuning using ZenML in the documentation](https://docs.zenml.io/user-guide/advanced-guide/pipelining-features/hyper-parameter-tuning). @@ -130,7 +175,7 @@ You can find more information about the current state of [Hyperparameter Tuning [πŸ“‚ Code folder](template/steps/training/)

- +

Having the best model architecture and its hyperparameters defined in the previous section makes it possible to train a quality model object. Also, model training is the right place to bring an [Experiment Tracker](https://docs.zenml.io/stacks-and-components/component-guide/experiment-trackers) into the picture - we will log all metrics and model object itself into the [Experiment Tracker](https://docs.zenml.io/stacks-and-components/component-guide/experiment-trackers), so we can register our model object in a [Model Registry](https://docs.zenml.io/stacks-and-components/component-guide/model-registries) and pass it down to a [Model Deployer](https://docs.zenml.io/stacks-and-components/component-guide/model-deployers) easily and traceable. We will use information from the active stack to make the implementation agnostic of the underlying infrastructure. @@ -168,8 +213,8 @@ def notify_on_success() -> None: @pipeline(on_failure=notify_on_failure) def e2e_example_training(...): ... - promote_metric_compare_promoter_in_model_registry(...) - notify_on_success(after=["promote_metric_compare_promoter_in_model_registry"]) + promote_with_metric_compare(...) + notify_on_success(after=["promote_with_metric_compare"]) ``` @@ -178,16 +223,37 @@ def e2e_example_training(...): [πŸ“‚ Code folder](template/steps/promotion/)

- +

Once the model object is trained and evaluated on meeting basic quality standards, we would like to understand whether it is good enough to beat the existing model object used in inference. This is a very important step, as promoting a weak model object as inference might have a huge negative impact. In this example, we are implementing promotion based on metric comparison to decide on the spot and avoid more complex approaches like Champion/Challengers shadow deployments. In other projects, other promotion techniques and strategies can be used. -To achieve this we would retrieve the model registry version from [Model Registry](https://docs.zenml.io/stacks-and-components/component-guide/model-registries): latest (the one we just trained) and current (the one having a proper tag). Next, we need to deploy both model objects using [Model Deployer](https://docs.zenml.io/stacks-and-components/component-guide/model-deployers) and run predictions on the testing set for both of them. Next, we select which one of the model registry versions has a better metric value and associate it with the inference tag. +To achieve this we would retrieve the model version from the Model Control Plane: latest (the one we just trained) and current (the one having a proper tag). Next, we need to deploy both model objects using [Model Deployer](https://docs.zenml.io/stacks-and-components/component-guide/model-deployers) and run predictions on the testing set for both of them. Next, we select which one of the model registry versions has a better metric value. If the newly trained model is performing better we promote it to the inference stage in the Model Control Plane. + +By doing so we ensure that the best-performing version will be used for inference later on and ensure seamless integration of relevant artifacts from the training pipeline in the batch inference pipeline. + +### [Continuous Deployment] Deployment + +The Deployment pipeline is designed to run with inference Model Control Plane version context. This ensures that we always infer only on quality-assured Model Control Plane version and provide seamless integration of required artifacts created during training of this Model Control Plane version. +This is achieved by providing this configuration in `deployer_config.yaml` used to configure our pipeline: +```yaml +model_config: + name: your_product_name + version: production +``` + +

+ +

+ +The deployment pipeline takes the model object trained earlier from the inference Model Control Plane version and produces a prediction service, which can be used by external tools or by another pipeline. In this case it will be used by a batch prediction pipeline later on. Prepared prediction service is linked to the same inference Model Control Plane version. + +``` +NOTE: In this template a prediction service is only created for local orchestrators, but you can redefine step definition to fit the needs of your infrastructure. +``` -As a last step we promote the current Model Control Plane version to the inference stage if a promotion decision was made. By doing so we ensure that the best Model Control Plane version would be used for inference later on and ensure seamless integration of relevant artifacts from training pipeline in the batch inference pipeline. ### [Continuous Deployment] Batch Inference The Batch Inference pipeline is designed to run with inference Model Control Plane version context. This ensures that we always infer only on quality-assured Model Control Plane version and provide seamless integration of required artifacts created during training of this Model Control Plane version. @@ -199,7 +265,7 @@ model_config: ```

- +

### [Continuous Deployment] Batch Inference: ETL Steps @@ -208,7 +274,7 @@ model_config: The process of loading data is similar to training, even the same step function is used, but with the `is_inference` flag. -But inference flow has an important difference - there is no need to fit preprocessing `Pipeline`, rather we need to reuse one fitted during training on the train set, to ensure that the model object gets the expected input. To do so we will use [ExternalArtifact](https://docs.zenml.io/user-guide/advanced-guide/pipelining-features/configure-steps-pipelines#pass-any-kind-of-data-to-your-steps) with lookup by `model_artifact_name` only to get the preprocessing pipeline fitted during the quality assurance training run. This is possible, since we configured batch inference pipeline to run inside a Model Control Plane version context. +But inference flow has an important difference - there is no need to fit preprocessing sklearn `Pipeline`, rather we need to reuse one fitted during training on the train set, to ensure that the model object gets the expected input. To do so we will use [ExternalArtifact](https://docs.zenml.io/user-guide/advanced-guide/pipelining-features/configure-steps-pipelines#pass-any-kind-of-data-to-your-steps) with lookup by `model_artifact_name` only to get the preprocessing pipeline fitted during the quality-assured training run. This is possible since we configured the batch inference pipeline to run inside a Model Control Plane version context.
Code snippet πŸ’» @@ -230,9 +296,9 @@ df_inference = inference_data_preprocessor( [πŸ“‚ Code folder](template/steps/%7B%25%20if%20data_quality_checks%20%25%7Ddata_quality%7B%25%20endif%20%25%7D) -On the drift reporting stage we will use [standard step](https://docs.zenml.io/stacks-and-components/component-guide/data-validators/evidently#the-evidently-data-validator) `evidently_report_step` to build Evidently report to assess certain data quality metrics. `evidently_report_step` has a number of options, but for this example, we will build only `DataQualityPreset` metrics preset to get a number of NA values in reference and current datasets. +In the drift reporting stage, we will use [standard step](https://docs.zenml.io/stacks-and-components/component-guide/data-validators/evidently#the-evidently-data-validator) `evidently_report_step` to build Evidently report to assess certain data quality metrics. `evidently_report_step` has a number of options, but for this example, we will build only `DataQualityPreset` metrics preset to get a number of NA values in reference and current datasets. -We pass `dataset_trn` from training pipeline as a `reference_dataset` here. To do so we will use [ExternalArtifact](https://docs.zenml.io/user-guide/advanced-guide/pipelining-features/configure-steps-pipelines#pass-any-kind-of-data-to-your-steps) with lookup by `model_artifact_name` only to get the training dataset used during quality-assured training run. This is possible, since we configured batch inference pipeline to run inside a Model Control Plane version context. +We pass `dataset_trn` from the training pipeline as a `reference_dataset` here. To do so we will use [ExternalArtifact](https://docs.zenml.io/user-guide/advanced-guide/pipelining-features/configure-steps-pipelines#pass-any-kind-of-data-to-your-steps) with lookup by `model_artifact_name` only to get the training dataset used during quality-assured training run. This is possible since we configured the batch inference pipeline to run inside a Model Control Plane version context. After the report is built we execute another quality gate using the `drift_quality_gate` step, which assesses if a significant drift in the NA count is observed. If so, execution is stopped with an exception. @@ -242,11 +308,14 @@ You can follow [Data Validators docs](https://docs.zenml.io/stacks-and-component [πŸ“‚ Code folder](template/steps/inference) -As a last step concluding all work done so far, we will calculate predictions on the inference dataset and persist them in [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) for reuse. +As a last step concluding all work done so far, we will calculate predictions on the inference dataset and persist them in [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) attached to the current inference model version of the Model Control Plane for reuse and observability. + +We will leverage a prepared predictions service called `mlflow_deployment` linked to the inference model version of the Model Control Plane to run `.predict()` and to put predictions as an output of the predictions step, so it is automatically stored in the [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) and linked to the Model Control Plane model version as a versioned artifact link with zero effort. This is achieved because we additionally annotated the `predictions` output with `ArtifactConfig(overwrite=False)`. This is required to deliver a comprehensive history to stakeholders since Batch Inference can be executed using the same Model Control Plane version multiple times. -As we performed promotion as part of the training pipeline it is very easy to fetch the needed model registry version from [Model Registry](https://docs.zenml.io/stacks-and-components/component-guide/model-registries) and deploy it for inference with [Model Deployer](https://docs.zenml.io/stacks-and-components/component-guide/model-deployers). +``` +NOTE: On non-local orchestrators a `model` artifact will be loaded into memory to run predictions directly. You can adapt this part to your needs. +``` -Once the model registry version is deployed the only thing left over is to call `.predict()` on the deployment service object and put those predictions as an output of the predictions step, so it is automatically stored in the [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) with zero effort. We additionally annotated the `predictions` output with `ArtifactConfig(overwrite=False)` to ensure that this artifact will get linked to a Model Control Plane version in a versioned fashion. This is required to deliver a comprehensive history to stakeholders since Batch Inference can be executed using the same Model Control Plane version multiple times.
Code snippet πŸ’» diff --git a/assets/01_etl.png b/assets/01_etl.png index c1fe3ec..b599ca3 100644 Binary files a/assets/01_etl.png and b/assets/01_etl.png differ diff --git a/assets/02_hp.png b/assets/02_hp.png index 00f2660..1dacd03 100644 Binary files a/assets/02_hp.png and b/assets/02_hp.png differ diff --git a/assets/03_train.png b/assets/03_train.png index ed0beb2..c8b45f6 100644 Binary files a/assets/03_train.png and b/assets/03_train.png differ diff --git a/assets/04_promotion.png b/assets/04_promotion.png index 0853f70..5cd1de1 100644 Binary files a/assets/04_promotion.png and b/assets/04_promotion.png differ diff --git a/assets/05_batch_inference.png b/assets/05_batch_inference.png deleted file mode 100644 index d4e6d7a..0000000 Binary files a/assets/05_batch_inference.png and /dev/null differ diff --git a/assets/05_deployment.png b/assets/05_deployment.png new file mode 100644 index 0000000..b22e8a7 Binary files /dev/null and b/assets/05_deployment.png differ diff --git a/assets/06_batch_inference.png b/assets/06_batch_inference.png new file mode 100644 index 0000000..b353fa8 Binary files /dev/null and b/assets/06_batch_inference.png differ diff --git a/template/.assets/00_pipelines_composition.png b/template/.assets/00_pipelines_composition.png index 3de1092..a01f067 100644 Binary files a/template/.assets/00_pipelines_composition.png and b/template/.assets/00_pipelines_composition.png differ diff --git a/template/README.md b/template/README.md index 89e0cfa..f8a261a 100644 --- a/template/README.md +++ b/template/README.md @@ -104,12 +104,12 @@ This template uses to demonstrate how to perform major critical steps for Continuous Training (CT) and Continuous Delivery (CD). -It consists of two pipelines with the following high-level setup: +It consists of three pipelines with the following high-level setup:

- +

-Both pipelines are inside a shared Model Control Plane model context - training pipeline creates and promotes new Model Control Plane version and inference pipeline is reading from inference Model Control Plane version. This makes those pipelines closely connected, while ensuring that only quality-assured Model Control Plane versions are used to produce predictions delivered to stakeholders. +All pipelines are leveraging the Model Control Plane to bring all parts together - the training pipeline creates and promotes a new Model Control Plane version with a trained model object in it, deployment pipeline uses the inference Model Control Plane version (the one promoted during training) to create a deployment service and inference pipeline using deployment service from the inference Model Control Plane version and store back new set of predictions as a versioned data artifact for future use. This makes those pipelines closely connected while ensuring that only quality-assured Model Control Plane versions are used to produce predictions delivered to stakeholders. * [CT] Training * Load, split, and preprocess the training dataset * Search for an optimal model object architecture and tune its hyperparameters @@ -117,6 +117,8 @@ Both pipelines are inside a shared Model Control Plane model context - training * Compare a recently trained model object with one promoted earlier * If a recently trained model object performs better - stage it as a new inference model object in model registry * On success of the current model object - stage newly created Model Control Plane version as the one used for inference +* [CD] Deployment + * Deploy a new prediction service based on the model object connected to the inference Model Control Plane version. * [CD] Batch Inference * Load the inference dataset and preprocess it reusing object fitted during training * Perform data drift analysis reusing training dataset of the inference Model Control Plane version as a reference @@ -142,23 +144,27 @@ The project loosely follows [the recommended ZenML project structure](https://do ``` . -β”œβ”€β”€ pipelines # `zenml.pipeline` implementations -β”‚ β”œβ”€β”€ batch_inference.py # [CD] Batch Inference pipeline -β”‚ └── training.py # [CT] Training Pipeline -β”œβ”€β”€ steps # logically grouped `zenml.steps` implementations -β”‚ β”œβ”€β”€ alerts # alert developer on pipeline status -β”‚ β”œβ”€β”€ data_quality # quality gates built on top of drift report -β”‚ β”œβ”€β”€ etl # ETL logic for dataset -β”‚ β”œβ”€β”€ hp_tuning # tune hyperparameters and model architectures -β”‚ β”œβ”€β”€ inference # inference on top of the model from the registry -β”‚ β”œβ”€β”€ promotion # find if a newly trained model will be new inference -β”‚ └── training # train and evaluate model -β”œβ”€β”€ utils # helper functions +β”œβ”€β”€ configs # pipelines configuration files +β”‚ β”œβ”€β”€ deployer_config.yaml # the configuration of the deployment pipeline +β”‚ β”œβ”€β”€ inference_config.yaml # the configuration of the batch inference pipeline +β”‚ └── train_config.yaml # the configuration of the training pipeline +β”œβ”€β”€ pipelines # `zenml.pipeline` implementations +β”‚ β”œβ”€β”€ batch_inference.py # [CD] Batch Inference pipeline +β”‚ β”œβ”€β”€ deployment.py # [CD] Deployment pipeline +β”‚ └── training.py # [CT] Training Pipeline +β”œβ”€β”€ steps # logically grouped `zenml.steps` implementations +β”‚ β”œβ”€β”€ alerts # alert developer on pipeline status +β”‚ β”œβ”€β”€ deployment # deploy trained model objects +β”‚ β”œβ”€β”€ data_quality # quality gates built on top of drift report +β”‚ β”œβ”€β”€ etl # ETL logic for dataset +β”‚ β”œβ”€β”€ hp_tuning # tune hyperparameters and model architectures +β”‚ β”œβ”€β”€ inference # inference on top of the model from the registry +β”‚ β”œβ”€β”€ promotion # find if a newly trained model will be new inference +β”‚ └── training # train and evaluate model +β”œβ”€β”€ utils # helper functions β”œβ”€β”€ .dockerignore -β”œβ”€β”€ inference_config.yaml # the configuration of the batch inference pipeline -β”œβ”€β”€ Makefile # helper scripts for quick start with integrations -β”œβ”€β”€ README.md # this file -β”œβ”€β”€ requirements.txt # extra Python dependencies -β”œβ”€β”€ run.py # CLI tool to run pipelines on ZenML Stack -└── train_config.yaml # the configuration of the training pipeline +β”œβ”€β”€ Makefile # helper scripts for quick start with integrations +β”œβ”€β”€ README.md # this file +β”œβ”€β”€ requirements.txt # extra Python dependencies +└── run.py # CLI tool to run pipelines on ZenML Stack ``` diff --git a/template/configs/deployer_config.yaml b/template/configs/deployer_config.yaml new file mode 100644 index 0000000..d152dde --- /dev/null +++ b/template/configs/deployer_config.yaml @@ -0,0 +1,31 @@ +# {% include 'template/license_header' %} + +# environment configuration +settings: + docker: + required_integrations: + - aws +{%- if data_quality_checks %} + - evidently +{%- endif %} + - kubeflow + - kubernetes + - mlflow + - sklearn + - slack + +# configuration of steps +steps: + notify_on_success: + parameters: + notify_on_success: False + +# configuration of the Model Control Plane +model_config: + name: {{ product_name }} + version: {{ target_environment }} + +# pipeline level extra configurations +extra: + notify_on_failure: True + diff --git a/template/inference_config.yaml b/template/configs/inference_config.yaml similarity index 62% rename from template/inference_config.yaml rename to template/configs/inference_config.yaml index 6d9d2e1..d152dde 100644 --- a/template/inference_config.yaml +++ b/template/configs/inference_config.yaml @@ -1,5 +1,6 @@ # {% include 'template/license_header' %} +# environment configuration settings: docker: required_integrations: @@ -12,15 +13,19 @@ settings: - mlflow - sklearn - slack -extra: - mlflow_model_name: {{ product_name }} -{%- if target_environment == 'production' %} - target_env: Production -{%- else %} - target_env: Staging -{%- endif %} - notify_on_success: False - notify_on_failure: True + +# configuration of steps +steps: + notify_on_success: + parameters: + notify_on_success: False + +# configuration of the Model Control Plane model_config: name: {{ product_name }} version: {{ target_environment }} + +# pipeline level extra configurations +extra: + notify_on_failure: True + diff --git a/template/train_config.yaml b/template/configs/train_config.yaml similarity index 70% rename from template/train_config.yaml rename to template/configs/train_config.yaml index 1f6f088..517812d 100644 --- a/template/train_config.yaml +++ b/template/configs/train_config.yaml @@ -1,5 +1,6 @@ # {% include 'template/license_header' %} +# environment configuration settings: docker: required_integrations: @@ -12,18 +13,53 @@ settings: - mlflow - sklearn - slack -extra: - mlflow_model_name: {{ product_name }} -{%- if target_environment == 'production' %} - target_env: Production + +# configuration of steps +steps: + model_trainer: + parameters: + name: {{ product_name }} +{%- if metric_compare_promotion %} + compute_performance_metrics_on_current_data: + parameters: + target_env: {{ target_environment }} + promote_with_metric_compare: {%- else %} - target_env: Staging + promote_latest_version: {%- endif %} - notify_on_success: False + parameters: + mlflow_model_name: {{ product_name }} + target_env: {{ target_environment }} + notify_on_success: + parameters: + notify_on_success: False + +# configuration of the Model Control Plane +model_config: + name: {{ product_name }} + license: {{ open_source_license }} + description: {{ product_name }} E2E Batch Use Case + audience: All ZenML users + use_cases: | + The {{project_name}} project demonstrates how the most important steps of + the ML Production Lifecycle can be implemented in a reusable way remaining + agnostic to the underlying infrastructure, and shows how to integrate them together + into pipelines for Training and Batch Inference purposes. + ethics: No impact. + tags: + - e2e + - batch + - sklearn + - from template + - ZenML delivered + create_new_model_version: true + +# pipeline level extra configurations +extra: notify_on_failure: True {%- if hyperparameters_tuning %} - # This set contains all the models that you want to evaluate - # during hyperparameter tuning stage. + # This set contains all the model configurations that you want + # to evaluate during hyperparameter tuning stage. model_search_space: random_forest: model_package: sklearn.ensemble @@ -67,7 +103,7 @@ extra: start: 1 end: 10 {%- else %} - # This model configuration will be used for the training stage. + # This model configuration will be used for the training stage. model_configuration: model_package: sklearn.tree model_class: DecisionTreeClassifier @@ -75,22 +111,4 @@ extra: criterion: gini max_depth: 5 min_samples_leaf: 3 -{%- endif %} -model_config: - name: {{ product_name }} - license: {{ open_source_license }} - description: {{ product_name }} E2E Batch Use Case - audience: All ZenML users - use_cases: | - The {{project_name}} project demonstrates how the most important steps of - the ML Production Lifecycle can be implemented in a reusable way remaining - agnostic to the underlying infrastructure, and shows how to integrate them together - into pipelines for Training and Batch Inference purposes. - ethics: No impact. - tags: - - e2e - - batch - - sklearn - - from template - - ZenML delivered - create_new_model_version: true \ No newline at end of file +{%- endif %} \ No newline at end of file diff --git a/template/pipelines/__init__.py b/template/pipelines/__init__.py index 3d65248..ac1e395 100644 --- a/template/pipelines/__init__.py +++ b/template/pipelines/__init__.py @@ -3,3 +3,4 @@ from .batch_inference import {{product_name}}_batch_inference from .training import {{product_name}}_training +from .deployment import {{product_name}}_deployment diff --git a/template/pipelines/batch_inference.py b/template/pipelines/batch_inference.py index eb6f231..56ddf6d 100644 --- a/template/pipelines/batch_inference.py +++ b/template/pipelines/batch_inference.py @@ -1,23 +1,18 @@ # {% include 'template/license_header' %} - from steps import ( data_loader, {%- if data_quality_checks %} drift_quality_gate, {%- endif %} inference_data_preprocessor, - inference_get_current_version, inference_predict, notify_on_failure, notify_on_success, ) -from zenml import get_pipeline_context, pipeline +from zenml import pipeline from zenml.integrations.evidently.metrics import EvidentlyMetricConfig from zenml.integrations.evidently.steps import evidently_report_step -from zenml.integrations.mlflow.steps.mlflow_deployer import ( - mlflow_model_registry_deployer_step, -) from zenml.logger import get_logger from zenml.artifacts.external_artifact import ExternalArtifact @@ -36,7 +31,13 @@ def {{product_name}}_batch_inference(): # Link all the steps together by calling them and passing the output # of one step as the input of the next step. ########## ETL stage ########## - df_inference, target = data_loader(is_inference=True) + df_inference, target, _ = data_loader( + random_state=ExternalArtifact( + model_artifact_pipeline_name="{{product_name}}_training", + model_artifact_name="random_state", + ), + is_inference=True + ) df_inference = inference_data_preprocessor( dataset_inf=df_inference, preprocess_pipeline=ExternalArtifact( @@ -60,15 +61,7 @@ def {{product_name}}_batch_inference(): drift_quality_gate(report) {%- endif %} ########## Inference stage ########## - deployment_service = mlflow_model_registry_deployer_step( - registry_model_name=get_pipeline_context().extra["mlflow_model_name"], - registry_model_version=ExternalArtifact( - model_artifact_name="promoted_version", - ), - replace_existing=True, - ) inference_predict( - deployment_service=deployment_service, dataset_inf=df_inference, {%- if data_quality_checks %} after=["drift_quality_gate"], diff --git a/template/pipelines/deployment.py b/template/pipelines/deployment.py new file mode 100644 index 0000000..aa4a21c --- /dev/null +++ b/template/pipelines/deployment.py @@ -0,0 +1,22 @@ +# {% include 'template/license_header' %} + +from steps import deployment_deploy,notify_on_success,notify_on_failure + +from zenml import pipeline + + +@pipeline(on_failure=notify_on_failure) +def {{product_name}}_deployment(): + """ + Model deployment pipeline. + + This is a pipeline deploys trained model for future inference. + """ + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + # Link all the steps together by calling them and passing the output + # of one step as the input of the next step. + ########## Deployment stage ########## + deployment_deploy() + + notify_on_success(after=["deployment_deploy"]) + ### YOUR CODE ENDS HERE ### diff --git a/template/pipelines/training.py b/template/pipelines/training.py index 1b1e766..b89eb96 100644 --- a/template/pipelines/training.py +++ b/template/pipelines/training.py @@ -2,40 +2,35 @@ from typing import List, Optional +import random from steps import ( data_loader, -{%- if hyperparameters_tuning %} - hp_tuning_select_best_model, - hp_tuning_single_search, -{%- endif %} model_evaluator, model_trainer, notify_on_failure, notify_on_success, + train_data_preprocessor, + train_data_splitter, +{%- if hyperparameters_tuning %} + hp_tuning_select_best_model, + hp_tuning_single_search, +{%- endif %} {%- if metric_compare_promotion %} - promote_get_metric, - promote_metric_compare_promoter_in_model_registry, + compute_performance_metrics_on_current_data, + promote_with_metric_compare, {%- else %} - promote_latest_in_model_registry, + promote_latest_version, {%- endif %} - promote_get_versions, - promote_model_version_in_model_control_plane, - train_data_preprocessor, - train_data_splitter, ) from zenml import pipeline, get_pipeline_context -from zenml.integrations.mlflow.steps.mlflow_deployer import ( - mlflow_model_registry_deployer_step, -) -from zenml.integrations.mlflow.steps.mlflow_registry import mlflow_register_model_step from zenml.logger import get_logger {%- if hyperparameters_tuning %} {%- else %} from zenml.artifacts.external_artifact import ExternalArtifact -from utils.get_model_from_config import get_model_from_config +from utils import get_model_from_config {%- endif %} logger = get_logger(__name__) @@ -74,7 +69,7 @@ def {{product_name}}_training( # of one step as the input of the next step. pipeline_extra = get_pipeline_context().extra ########## ETL stage ########## - raw_data, target = data_loader() + raw_data, target, _ = data_loader(random_state=random.randint(0,100)) dataset_trn, dataset_tst = train_data_splitter( dataset=raw_data, test_size=test_size, @@ -103,9 +98,7 @@ def {{product_name}}_training( target=target, ) after.append(step_name) - best_model = hp_tuning_select_best_model( - search_steps_prefix=search_steps_prefix, after=after - ) + best_model = hp_tuning_select_best_model(after=after) {%- else %} model_configuration = pipeline_extra["model_configuration"] best_model = get_model_from_config( @@ -133,55 +126,22 @@ def {{product_name}}_training( fail_on_accuracy_quality_gates=fail_on_accuracy_quality_gates, target=target, ) - mlflow_register_model_step( - model, - name=pipeline_extra["mlflow_model_name"], - ) - ########## Promotion stage ########## - latest_version, current_version = promote_get_versions( - after=["mlflow_register_model_step"], - ) {%- if metric_compare_promotion %} - latest_deployment = mlflow_model_registry_deployer_step( - id="deploy_latest_model_version", - registry_model_name=pipeline_extra["mlflow_model_name"], - registry_model_version=latest_version, - replace_existing=True, - ) - latest_metric = promote_get_metric( - id="get_metrics_latest_model_version", + latest_metric,current_metric = compute_performance_metrics_on_current_data( dataset_tst=dataset_tst, - deployment_service=latest_deployment, + after=["model_evaluator"] ) - current_deployment = mlflow_model_registry_deployer_step( - id="deploy_current_model_version", - registry_model_name=pipeline_extra["mlflow_model_name"], - registry_model_version=current_version, - replace_existing=True, - after=["get_metrics_latest_model_version"], - ) - current_metric = promote_get_metric( - id="get_metrics_current_model_version", - dataset_tst=dataset_tst, - deployment_service=current_deployment, - ) - - was_promoted, promoted_version = promote_metric_compare_promoter_in_model_registry( + promote_with_metric_compare( latest_metric=latest_metric, current_metric=current_metric, - latest_version=latest_version, - current_version=current_version, ) + last_step = "promote_with_metric_compare" {%- else %} - promoted_version = promote_latest_in_model_registry( - latest_version=latest_version, - current_version=current_version, - ) - was_promoted = True + promote_latest_version(after=["model_evaluator"]) + last_step = "promote_latest_version" {%- endif %} - promote_model_version_in_model_control_plane(was_promoted) - notify_on_success(after=["promote_model_version_in_model_control_plane"]) + notify_on_success(after=[last_step]) ### YOUR CODE ENDS HERE ### diff --git a/template/run.py b/template/run.py index 123b830..4f40e7d 100644 --- a/template/run.py +++ b/template/run.py @@ -8,7 +8,7 @@ from zenml.artifacts.external_artifact import ExternalArtifact from zenml.logger import get_logger -from pipelines import {{product_name}}_batch_inference, {{product_name}}_training +from pipelines import {{product_name}}_batch_inference, {{product_name}}_training, {{product_name}}_deployment logger = get_logger(__name__) @@ -159,6 +159,7 @@ def main( pipeline_args["config_path"] = os.path.join( os.path.dirname(os.path.realpath(__file__)), + "configs", "train_config.yaml", ) pipeline_args[ @@ -167,10 +168,23 @@ def main( {{product_name}}_training.with_options(**pipeline_args)(**run_args_train) logger.info("Training pipeline finished successfully!") + # Execute Deployment Pipeline + run_args_inference = {} + pipeline_args["config_path"] = os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "configs", + "deployer_config.yaml", + ) + pipeline_args[ + "run_name" + ] = f"{{product_name}}_deployment_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + {{product_name}}_deployment.with_options(**pipeline_args)(**run_args_inference) + # Execute Batch Inference Pipeline run_args_inference = {} pipeline_args["config_path"] = os.path.join( os.path.dirname(os.path.realpath(__file__)), + "configs", "inference_config.yaml", ) pipeline_args[ diff --git a/template/steps/__init__.py b/template/steps/__init__.py index 3e0eb05..5a8fcea 100644 --- a/template/steps/__init__.py +++ b/template/steps/__init__.py @@ -17,12 +17,11 @@ from .inference import inference_predict from .promotion import ( {%- if metric_compare_promotion %} - promote_get_metric, - promote_metric_compare_promoter_in_model_registry, + compute_performance_metrics_on_current_data, + promote_with_metric_compare, {%- else %} - promote_latest_in_model_registry, + promote_latest_version, {%- endif %} - promote_get_versions, - promote_model_version_in_model_control_plane, ) from .training import model_evaluator, model_trainer +from .deployment import deployment_deploy diff --git a/template/steps/alerts/notify_on.py b/template/steps/alerts/notify_on.py index ae183a9..d9fe9e9 100644 --- a/template/steps/alerts/notify_on.py +++ b/template/steps/alerts/notify_on.py @@ -35,8 +35,7 @@ def notify_on_failure() -> None: @step(enable_cache=False) -def notify_on_success() -> None: +def notify_on_success(notify_on_success: bool) -> None: """Notifies user on pipeline success.""" - step_context = get_step_context() - if alerter and step_context.pipeline_run.config.extra["notify_on_success"]: + if alerter and notify_on_success: alerter.post(message=build_message(status="succeeded")) diff --git a/template/steps/deployment/__init__.py b/template/steps/deployment/__init__.py new file mode 100644 index 0000000..56ca367 --- /dev/null +++ b/template/steps/deployment/__init__.py @@ -0,0 +1,4 @@ +# {% include 'template/license_header' %} + + +from .deployment_deploy import deployment_deploy diff --git a/template/steps/deployment/deployment_deploy.py b/template/steps/deployment/deployment_deploy.py new file mode 100644 index 0000000..fd5d0d4 --- /dev/null +++ b/template/steps/deployment/deployment_deploy.py @@ -0,0 +1,62 @@ +# {% include 'template/license_header' %} + + +from typing import Optional +from typing_extensions import Annotated + +from zenml import step, get_step_context +from zenml.client import Client +from zenml.logger import get_logger +from zenml.model import DeploymentArtifactConfig +from zenml.integrations.mlflow.steps.mlflow_deployer import ( + mlflow_model_registry_deployer_step, +) +from zenml.integrations.mlflow.services.mlflow_deployment import ( + MLFlowDeploymentService, +) +from utils import get_model_registry_version + +logger = get_logger(__name__) + + +@step +def deployment_deploy() -> ( + Annotated[ + Optional[MLFlowDeploymentService], + "mlflow_deployment", + DeploymentArtifactConfig(), + ] +): + """Predictions step. + + This is an example of a predictions step that takes the data in and returns + predicted values. + + This step is parameterized, which allows you to configure the step + independently of the step code, before running it in a pipeline. + In this example, the step can be configured to use different input data. + See the documentation for more information: + + https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines + + Args: + dataset_inf: The inference dataset. + + Returns: + The predictions as pandas series + """ + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + if Client().active_stack.orchestrator.flavor == "local": + model_version = get_step_context().model_config._get_model_version() + + # deploy predictor service + deployment_service = mlflow_model_registry_deployer_step.entrypoint( + registry_model_name=model_version.model.name, + registry_model_version=get_model_registry_version(model_version), + replace_existing=True, + ) + else: + logger.warning("Skipping deployment as the orchestrator is not local.") + deployment_service = None + ### YOUR CODE ENDS HERE ### + return deployment_service diff --git a/template/steps/etl/data_loader.py b/template/steps/etl/data_loader.py index 4bcb59e..5fdc7c3 100644 --- a/template/steps/etl/data_loader.py +++ b/template/steps/etl/data_loader.py @@ -7,18 +7,19 @@ import pandas as pd from sklearn.datasets import load_breast_cancer from zenml import step -from zenml.client import Client from zenml.logger import get_logger logger = get_logger(__name__) -artifact_store = Client().active_stack.artifact_store - @step def data_loader( - is_inference: bool = False, -) -> Tuple[Annotated[pd.DataFrame, "dataset"], Annotated[str, "target"],]: + random_state: int, is_inference: bool = False +) -> Tuple[ + Annotated[pd.DataFrame, "dataset"], + Annotated[str, "target"], + Annotated[int, "random_state"], +]: """Dataset reader step. This is an example of a dataset reader step that load Breast Cancer dataset. @@ -33,6 +34,7 @@ def data_loader( Args: is_inference: If `True` subset will be returned and target column will be removed from dataset. + random_state: Random state for sampling Returns: The dataset artifact as Pandas DataFrame and name of target column. @@ -42,7 +44,7 @@ def data_loader( inference_size = int(len(dataset.target) * 0.05) target = "target" dataset: pd.DataFrame = dataset.frame - inference_subset = dataset.sample(inference_size, random_state=42) + inference_subset = dataset.sample(inference_size, random_state=random_state) if is_inference: dataset = inference_subset dataset.drop(columns=target, inplace=True) @@ -51,4 +53,4 @@ def data_loader( dataset.reset_index(drop=True, inplace=True) logger.info(f"Dataset with {len(dataset)} records loaded!") ### YOUR CODE ENDS HERE ### - return dataset, target + return dataset, target, random_state diff --git a/template/steps/etl/inference_data_preprocessor.py b/template/steps/etl/inference_data_preprocessor.py index fa37bfa..9a20f61 100644 --- a/template/steps/etl/inference_data_preprocessor.py +++ b/template/steps/etl/inference_data_preprocessor.py @@ -14,7 +14,11 @@ def inference_data_preprocessor( dataset_inf: pd.DataFrame, preprocess_pipeline: Pipeline, target: str, -) -> Annotated[pd.DataFrame, "dataset_inf", ArtifactConfig(overwrite=False, artifact_name="inference_dataset")]: +) -> Annotated[ + pd.DataFrame, + "dataset_inf", + ArtifactConfig(overwrite=False, artifact_name="inference_dataset"), +]: """Data preprocessor step. This is an example of a data processor step that prepares the data so that diff --git a/template/steps/inference/inference_predict.py b/template/steps/inference/inference_predict.py index c0b52fb..d92e4de 100644 --- a/template/steps/inference/inference_predict.py +++ b/template/steps/inference/inference_predict.py @@ -1,19 +1,22 @@ # {% include 'template/license_header' %} +from typing import Optional from typing_extensions import Annotated import pandas as pd -from zenml import step -from zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( +from zenml import step, get_step_context +from zenml.logger import get_logger +from zenml.model import ArtifactConfig +from zenml.integrations.mlflow.services.mlflow_deployment import ( MLFlowDeploymentService, ) -from zenml.model import ArtifactConfig + +logger = get_logger(__name__) @step def inference_predict( - deployment_service: MLFlowDeploymentService, dataset_inf: pd.DataFrame, ) -> Annotated[pd.Series, "predictions", ArtifactConfig(overwrite=False)]: """Predictions step. @@ -23,22 +26,37 @@ def inference_predict( This step is parameterized, which allows you to configure the step independently of the step code, before running it in a pipeline. - In this example, the step can be configured to use different input data - and model version in registry. See the documentation for more information: + In this example, the step can be configured to use different input data. + See the documentation for more information: https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines Args: - deployment_service: Deployed model service. dataset_inf: The inference dataset. Returns: - The processed dataframe: dataset_inf. + The predictions as pandas series """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - predictions = deployment_service.predict(request=dataset_inf) + model_version = get_step_context().model_config._get_model_version() + + # get predictor + predictor_service: Optional[MLFlowDeploymentService] = model_version.get_deployment( + "mlflow_deployment" + ).load() + if predictor_service is not None: + # run prediction from service + predictions = predictor_service.predict(request=dataset_inf) + else: + logger.warning( + "Predicting from loaded model instead of deployment service " + "as the orchestrator is not local." + ) + # run prediction from memory + predictor = model_version.get_model_object("model").load() + predictions = predictor.predict(dataset_inf) + predictions = pd.Series(predictions, name="predicted") - deployment_service.deprovision(force=True) ### YOUR CODE ENDS HERE ### return predictions diff --git a/template/steps/promotion/__init__.py b/template/steps/promotion/__init__.py index afeb1cb..623727e 100644 --- a/template/steps/promotion/__init__.py +++ b/template/steps/promotion/__init__.py @@ -2,10 +2,8 @@ {%- if metric_compare_promotion %} -from .promote_get_metric import promote_get_metric -from .promote_metric_compare_promoter_in_model_registry import promote_metric_compare_promoter_in_model_registry +from .compute_performance_metrics_on_current_data import compute_performance_metrics_on_current_data +from .promote_with_metric_compare import promote_with_metric_compare {%- else %} -from .promote_latest_in_model_registry import promote_latest_in_model_registry +from .promote_latest_version import promote_latest_version {%- endif %} -from .promote_get_versions import promote_get_versions -from .promote_model_version_in_model_control_plane import promote_model_version_in_model_control_plane diff --git a/template/steps/promotion/promote_get_versions.py b/template/steps/promotion/promote_get_versions.py deleted file mode 100644 index bb5ff3c..0000000 --- a/template/steps/promotion/promote_get_versions.py +++ /dev/null @@ -1,53 +0,0 @@ -# {% include 'template/license_header' %} - - -from typing import Tuple -from typing_extensions import Annotated - -from zenml import get_step_context, step -from zenml.client import Client -from zenml.logger import get_logger -from zenml.model_registries.base_model_registry import ModelVersionStage - -logger = get_logger(__name__) - -model_registry = Client().active_stack.model_registry - - -@step -def promote_get_versions() -> ( - Tuple[Annotated[str, "latest_version"], Annotated[str, "current_version"]] -): - """Step to get latest and currently tagged model version from Model Registry. - - This is an example of a model version extraction step. It will retrieve 2 model - versions from Model Registry: latest and currently promoted to target - environment (Production, Staging, etc). - - Returns: - The model versions: latest and current. If not current version - returns same - for both. - """ - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - pipeline_extra = get_step_context().pipeline_run.config.extra - none_versions = model_registry.list_model_versions( - name=pipeline_extra["mlflow_model_name"], - stage=None, - ) - latest_versions = none_versions[0].version - logger.info(f"Latest model version is {latest_versions}") - - target_versions = model_registry.list_model_versions( - name=pipeline_extra["mlflow_model_name"], - stage=ModelVersionStage(pipeline_extra["target_env"]), - ) - current_version = latest_versions - if target_versions: - current_version = target_versions[0].version - logger.info(f"Currently promoted model version is {current_version}") - else: - logger.info("No currently promoted model version found.") - ### YOUR CODE ENDS HERE ### - - return latest_versions, current_version diff --git a/template/steps/promotion/promote_model_version_in_model_control_plane.py b/template/steps/promotion/promote_model_version_in_model_control_plane.py deleted file mode 100644 index 725311b..0000000 --- a/template/steps/promotion/promote_model_version_in_model_control_plane.py +++ /dev/null @@ -1,26 +0,0 @@ -# {% include 'template/license_header' %} - - -from zenml import get_step_context, step -from zenml.logger import get_logger - -logger = get_logger(__name__) - - -@step -def promote_model_version_in_model_control_plane(promotion_decision: bool): - """Step to promote current model version to target environment in Model Control Plane. - - Args: - promotion_decision: Whether to promote current model version to target environment - """ - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - if promotion_decision: - target_env = get_step_context().pipeline_run.config.extra["target_env"].lower() - model_version = get_step_context().model_config._get_model_version() - model_version.set_stage(stage=target_env, force=True) - logger.info(f"Current model version was promoted to '{target_env}'.") - else: - logger.info("Current model version was not promoted.") - ### YOUR CODE ENDS HERE ### diff --git a/template/steps/promotion/{% if metric_compare_promotion %}compute_performance_metrics_on_current_data.py{% endif %} b/template/steps/promotion/{% if metric_compare_promotion %}compute_performance_metrics_on_current_data.py{% endif %} new file mode 100644 index 0000000..6bb12e9 --- /dev/null +++ b/template/steps/promotion/{% if metric_compare_promotion %}compute_performance_metrics_on_current_data.py{% endif %} @@ -0,0 +1,63 @@ +# {% include 'template/license_header' %} + +from typing import Tuple +from typing_extensions import Annotated + +import pandas as pd +from sklearn.metrics import accuracy_score +from zenml import step +from zenml.logger import get_logger + +from utils import get_model_versions + +logger = get_logger(__name__) + +@step +def compute_performance_metrics_on_current_data( + dataset_tst: pd.DataFrame, + target_env: str, +) -> Tuple[Annotated[float, "latest_metric"],Annotated[float, "current_metric"]]: + """Get metrics for comparison during promotion on fresh dataset. + + This is an example of a metrics calculation step. It computes metric + on recent test dataset. + + This step is parameterized, which allows you to configure the step + independently of the step code, before running it in a pipeline. + In this example, the step can be configured to use different input data + and target environment stage for promotion. + See the documentation for more information: + + https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines + + Args: + dataset_tst: The test dataset. + + Returns: + Latest version and current version metric values on a test set. + """ + + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + X = dataset_tst.drop(columns=["target"]) + y = dataset_tst["target"].to_numpy() + logger.info("Evaluating model metrics...") + + # Get model version numbers from Model Control Plane + latest_version, current_version = get_model_versions(target_env) + + # Get predictors + predictors = { + latest_version.number: latest_version.get_model_object("model").load(), + current_version.number: current_version.get_model_object("model").load(), + } + + if latest_version != current_version: + metrics = {} + for version in [latest_version.number,current_version.number]: + # predict and evaluate + predictions = predictors[version].predict(X) + metrics[version]=accuracy_score(y, predictions) + else: + metrics = {latest_version.number:1.0,current_version.number:0.0} + ### YOUR CODE ENDS HERE ### + return metrics[latest_version.number],metrics[current_version.number] diff --git a/template/steps/promotion/{% if metric_compare_promotion %}promote_get_metric.py{% endif %} b/template/steps/promotion/{% if metric_compare_promotion %}promote_get_metric.py{% endif %} deleted file mode 100644 index 592ff05..0000000 --- a/template/steps/promotion/{% if metric_compare_promotion %}promote_get_metric.py{% endif %} +++ /dev/null @@ -1,53 +0,0 @@ -# {% include 'template/license_header' %} - - -from typing_extensions import Annotated - -import pandas as pd -from sklearn.metrics import accuracy_score -from zenml import step -from zenml.client import Client -from zenml.integrations.mlflow.services import MLFlowDeploymentService -from zenml.logger import get_logger - -logger = get_logger(__name__) - -model_registry = Client().active_stack.model_registry - - -@step -def promote_get_metric( - dataset_tst: pd.DataFrame, - deployment_service: MLFlowDeploymentService, -) -> Annotated[float, "metric"]: - """Get metric for comparison for one model deployment. - - This is an example of a metric calculation step. It get a model deployment - service and computes metric on recent test dataset. - - This step is parameterized, which allows you to configure the step - independently of the step code, before running it in a pipeline. - In this example, the step can be configured to use different input data. - See the documentation for more information: - - https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines - - Args: - dataset_tst: The test dataset. - deployment_service: Model version deployment. - - Returns: - Metric value for a given deployment on test set. - - """ - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - X = dataset_tst.drop(columns=["target"]) - y = dataset_tst["target"].to_numpy() - logger.info("Evaluating model metrics...") - - predictions = deployment_service.predict(request=X) - metric = accuracy_score(y, predictions) - deployment_service.deprovision(force=True) - ### YOUR CODE ENDS HERE ### - return metric diff --git a/template/steps/promotion/{% if metric_compare_promotion %}promote_metric_compare_promoter_in_model_registry.py{% endif %} b/template/steps/promotion/{% if metric_compare_promotion %}promote_with_metric_compare.py{% endif %} similarity index 52% rename from template/steps/promotion/{% if metric_compare_promotion %}promote_metric_compare_promoter_in_model_registry.py{% endif %} rename to template/steps/promotion/{% if metric_compare_promotion %}promote_with_metric_compare.py{% endif %} index 8498876..9e9dc73 100644 --- a/template/steps/promotion/{% if metric_compare_promotion %}promote_metric_compare_promoter_in_model_registry.py{% endif %} +++ b/template/steps/promotion/{% if metric_compare_promotion %}promote_with_metric_compare.py{% endif %} @@ -1,23 +1,20 @@ # {% include 'template/license_header' %} -from typing_extensions import Annotated, Tuple from zenml import get_step_context, step -from zenml.client import Client from zenml.logger import get_logger -from zenml.model_registries.base_model_registry import ModelVersionStage -logger = get_logger(__name__) +from utils import get_model_versions, promote_in_model_registry, get_model_registry_version -model_registry = Client().active_stack.model_registry +logger = get_logger(__name__) @step -def promote_metric_compare_promoter_in_model_registry( +def promote_with_metric_compare( latest_metric: float, current_metric: float, - latest_version: str, - current_version: str, -)->Tuple[Annotated[bool, "was_promoted"], Annotated[int,"promoted_version"]]: + mlflow_model_name: str, + target_env: str, +)->None: """Try to promote trained model. This is an example of a model promotion step. It gets precomputed @@ -31,7 +28,8 @@ def promote_metric_compare_promoter_in_model_registry( This step is parameterized, which allows you to configure the step independently of the step code, before running it in a pipeline. - In this example, the step can be configured to use different input data. + In this example, the step can be configured to use precomputed model metrics + and target environment stage for promotion. See the documentation for more information: https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines @@ -39,16 +37,15 @@ def promote_metric_compare_promoter_in_model_registry( Args: latest_metric: Recently trained model metric results. current_metric: Previously promoted model metric results. - latest_version: Recently trained model version. - current_version:Previously promoted model version. - """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - pipeline_extra = get_step_context().pipeline_run.config.extra should_promote = True - if latest_version == current_version: + # Get model version numbers from Model Control Plane + latest_version, current_version = get_model_versions(target_env) + + if latest_version.number == current_version.number: logger.info("No current model version found - promoting latest") else: logger.info( @@ -57,31 +54,31 @@ def promote_metric_compare_promoter_in_model_registry( ) if latest_metric >= current_metric: logger.info( - "Latest model versions outperformed current versions - promoting latest" + "Latest model version outperformed current version - promoting latest" ) else: logger.info( - "Current model versions outperformed latest versions - keeping current" + "Current model version outperformed latest version - keeping current" ) should_promote = False - promoted_version = current_version + promoted_version = get_model_registry_version(current_version) if should_promote: - if latest_version != current_version: - model_registry.update_model_version( - name=pipeline_extra["mlflow_model_name"], - version=current_version, - stage=ModelVersionStage.ARCHIVED, - ) - model_registry.update_model_version( - name=pipeline_extra["mlflow_model_name"], - version=latest_version, - stage=ModelVersionStage(pipeline_extra["target_env"]), + # Promote in Model Control Plane + model_version = get_step_context().model_config._get_model_version() + model_version.set_stage(stage=target_env, force=True) + logger.info(f"Current model version was promoted to '{target_env}'.") + + # Promote in Model Registry + promote_in_model_registry( + latest_version=get_model_registry_version(latest_version), + current_version=get_model_registry_version(current_version), + model_name=mlflow_model_name, + target_env=target_env.capitalize(), ) - promoted_version = latest_version + promoted_version = get_model_registry_version(latest_version) logger.info( - f"Current model version in `{pipeline_extra['target_env']}` is `{promoted_version}`" + f"Current model version in `{target_env}` is `{promoted_version}` registered in Model Registry" ) ### YOUR CODE ENDS HERE ### - return should_promote, int(promoted_version) diff --git a/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_in_model_registry.py{% endif %} b/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_in_model_registry.py{% endif %} deleted file mode 100644 index 02f6ec9..0000000 --- a/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_in_model_registry.py{% endif %} +++ /dev/null @@ -1,52 +0,0 @@ -# {% include 'template/license_header' %} - -from typing_extensions import Annotated -from zenml import get_step_context, step -from zenml.client import Client -from zenml.logger import get_logger -from zenml.model_registries.base_model_registry import ModelVersionStage - -logger = get_logger(__name__) - -model_registry = Client().active_stack.model_registry - - -@step -def promote_latest_in_model_registry( - latest_version:str, - current_version:str, -) -> Annotated[int,"promoted_version"]: - """Promote latest trained model. - - This is an example of a model promotion step, which promotes the - latest trained model to the current version. - - Args: - latest_version: Recently trained model version. - current_version: Current model version, if present. - - """ - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - logger.info(f"Promoting latest model version `{latest_version}`") - pipeline_extra = get_step_context().pipeline_run.config.extra - if latest_version != current_version: - model_registry.update_model_version( - name=pipeline_extra["mlflow_model_name"], - version=current_version, - stage=ModelVersionStage(ModelVersionStage.ARCHIVED), - metadata={}, - ) - model_registry.update_model_version( - name=pipeline_extra["mlflow_model_name"], - version=latest_version, - stage=ModelVersionStage(pipeline_extra["target_env"]), - metadata={}, - ) - promoted_version = latest_version - - logger.info( - f"Current model version in `{pipeline_extra['target_env']}` is `{promoted_version}`" - ) - ### YOUR CODE ENDS HERE ### - return int(promoted_version) diff --git a/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_version.py{% endif %} b/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_version.py{% endif %} new file mode 100644 index 0000000..17299a1 --- /dev/null +++ b/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest_version.py{% endif %} @@ -0,0 +1,39 @@ +# {% include 'template/license_header' %} + +from zenml import get_step_context, step +from zenml.logger import get_logger + +from utils import get_model_versions, promote_in_model_registry, get_model_registry_version + +logger = get_logger(__name__) + + +@step +def promote_latest_version( + mlflow_model_name: str, + target_env: str +) -> None: + """Promote latest trained model. + + This is an example of a model promotion step, which promotes the + latest trained model to the current version. + """ + + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + # Get model version numbers from Model Control Plane + latest_version, current_version = get_model_versions(target_env) + logger.info(f"Promoting latest model version `{latest_version}`") + + # Promote in Model Control Plane + model_version = get_step_context().model_config._get_model_version() + model_version.set_stage(stage=target_env, force=True) + logger.info(f"Current model version was promoted to '{target_env}'.") + + # Promote in Model Registry + promote_in_model_registry( + latest_version=get_model_registry_version(latest_version), + current_version=get_model_registry_version(current_version), + model_name=mlflow_model_name, + target_env=target_env.capitalize(), + ) + ### YOUR CODE ENDS HERE ### diff --git a/template/steps/training/model_evaluator.py b/template/steps/training/model_evaluator.py index 8d0d0f8..f31458d 100644 --- a/template/steps/training/model_evaluator.py +++ b/template/steps/training/model_evaluator.py @@ -1,9 +1,6 @@ # {% include 'template/license_header' %} -from typing import Tuple -from typing_extensions import Annotated - import mlflow import pandas as pd from sklearn.base import ClassifierMixin @@ -25,7 +22,7 @@ def model_evaluator( min_train_accuracy: float = 0.0, min_test_accuracy: float = 0.0, fail_on_accuracy_quality_gates: bool = False, -): +) -> None: """Evaluate a trained model. This is an example of a model evaluation step that takes in a model artifact diff --git a/template/steps/training/model_trainer.py b/template/steps/training/model_trainer.py index fb26e0d..9896165 100644 --- a/template/steps/training/model_trainer.py +++ b/template/steps/training/model_trainer.py @@ -6,11 +6,12 @@ import mlflow import pandas as pd from sklearn.base import ClassifierMixin -from zenml import step +from zenml import step, log_artifact_metadata from zenml.client import Client from zenml.integrations.mlflow.experiment_trackers import MLFlowExperimentTracker from zenml.logger import get_logger from zenml.model import ModelArtifactConfig +from zenml.integrations.mlflow.steps.mlflow_registry import mlflow_register_model_step logger = get_logger(__name__) @@ -30,6 +31,7 @@ def model_trainer( dataset_trn: pd.DataFrame, model: ClassifierMixin, target: str, + name: str, ) -> Annotated[ClassifierMixin, "model", ModelArtifactConfig()]: """Configure and train a model on the training dataset. @@ -56,6 +58,7 @@ def model_trainer( dataset_trn: The preprocessed train dataset. model: The model instance to train. target: Name of target columns in dataset. + name: The name of the model. Returns: The trained model artifact. @@ -70,6 +73,19 @@ def model_trainer( dataset_trn.drop(columns=[target]), dataset_trn[target], ) + + # register mlflow model + mlflow_register_model_step.entrypoint( + model, + name=name, + ) + # keep track of mlflow version for future use + log_artifact_metadata( + output_name="model", + model_registry_version=Client() + .active_stack.model_registry.list_model_versions(name=name)[-1] + .version, + ) ### YOUR CODE ENDS HERE ### return model diff --git a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py index 0f0bcc3..ea78e7f 100644 --- a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py +++ b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py @@ -6,40 +6,36 @@ from sklearn.base import ClassifierMixin from zenml import get_step_context, step -from zenml.client import Client from zenml.logger import get_logger logger = get_logger(__name__) @step -def hp_tuning_select_best_model( - search_steps_prefix: str, -) -> Annotated[ClassifierMixin, "best_model"]: +def hp_tuning_select_best_model() -> Annotated[ClassifierMixin, "best_model"]: """Find best model across all HP tuning attempts. - This is an example of a model hyperparameter tuning step that takes - in prefix of steps called previously to search for best hyperparameters. - It will loop other them and find best model of all according to metric. - - Args: - search_steps_prefix: Prefix of steps used for grid search before. + This is an example of a model hyperparameter tuning step that loops + other artifacts linked to model version in Model Control Plane to find + the best hyperparameter tuning output model of all according to the metric. Returns: The best possible model class and its' parameters. """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - run_name = get_step_context().pipeline_run.name - run = Client().get_pipeline_run(run_name) + model_version = get_step_context().model_config._get_model_version() best_model = None best_metric = -1 - for run_step_name, run_step in run.steps.items(): - if run_step_name.startswith(search_steps_prefix): - if "best_model" in run_step.outputs: - model: ClassifierMixin = run_step.outputs["best_model"].load() - metric: float = run_step.outputs["metric"].load() - if best_model is None or best_metric < metric: - best_model = model + # consume artifacts attached to current model version in Model Control Plane + for full_artifact_name in model_version.artifact_object_ids: + # if artifacts comes from one of HP tuning steps + if full_artifact_name.endswith("hp_result"): + hp_output = model_version.artifacts[full_artifact_name]["1"] + model: ClassifierMixin = hp_output.load() + # fetch metadata we attached earlier + metric = float(hp_output.metadata["metric"].value) + if best_model is None or best_metric < metric: + best_model = model ### YOUR CODE ENDS HERE ### return best_model diff --git a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py index e088d50..e88ed42 100644 --- a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py +++ b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py @@ -1,17 +1,17 @@ # {% include 'template/license_header' %} -from typing import Any, Dict, Tuple +from typing import Any, Dict from typing_extensions import Annotated import pandas as pd from sklearn.base import ClassifierMixin from sklearn.metrics import accuracy_score from sklearn.model_selection import RandomizedSearchCV -from zenml import step +from zenml import step, log_artifact_metadata from zenml.logger import get_logger -from utils.get_model_from_config import get_model_from_config +from utils import get_model_from_config logger = get_logger(__name__) @@ -24,7 +24,7 @@ def hp_tuning_single_search( dataset_trn: pd.DataFrame, dataset_tst: pd.DataFrame, target: str, -) -> Tuple[Annotated[ClassifierMixin, "best_model"], Annotated[float, "metric"]]: +) -> Annotated[ClassifierMixin, "hp_result"]: """Evaluate a trained model. This is an example of a model hyperparameter tuning step that takes @@ -47,7 +47,7 @@ def hp_tuning_single_search( target: Name of target columns in dataset. Returns: - The best possible model parameters for given config. + The best possible model for given config. """ model_class = get_model_from_config(model_package, model_class) @@ -79,5 +79,10 @@ def hp_tuning_single_search( cv.fit(X=X_trn, y=y_trn) y_pred = cv.predict(X_tst) score = accuracy_score(y_tst, y_pred) + # log score along with output artifact as metadata + log_artifact_metadata( + output_name="hp_result", + metric=float(score), + ) ### YOUR CODE ENDS HERE ### - return cv.best_estimator_, score + return cv.best_estimator_ diff --git a/template/utils/__init__.py b/template/utils/__init__.py new file mode 100644 index 0000000..ca47d21 --- /dev/null +++ b/template/utils/__init__.py @@ -0,0 +1,6 @@ +# {% include 'template/license_header' %} + + +from .get_model_from_config import get_model_from_config +from .model_versions import get_model_versions, get_model_registry_version +from .promote_in_model_registry import promote_in_model_registry diff --git a/template/utils/misc.py b/template/utils/misc.py deleted file mode 100644 index 936a57f..0000000 --- a/template/utils/misc.py +++ /dev/null @@ -1,33 +0,0 @@ -# {% include 'template/license_header' %} - - -import string - -import pandas as pd -from sklearn.datasets import make_classification - - -def generate_random_data(n_samples: int) -> pd.DataFrame: - """Generate random data for model input. - - Args: - n_samples: Number of records to generate. - - Returns: - pd.DataFrame: Generated dataset for classification task. - """ - n_features = 20 - X, y = make_classification( - n_samples=n_samples, - n_features=n_features, - n_classes=2, - random_state=42, - ) - dataset = pd.concat( - [ - pd.DataFrame(X, columns=list(string.ascii_uppercase[:n_features])), - pd.Series(y, name="target"), - ], - axis=1, - ) - return dataset diff --git a/template/utils/model_versions.py b/template/utils/model_versions.py new file mode 100644 index 0000000..083926b --- /dev/null +++ b/template/utils/model_versions.py @@ -0,0 +1,43 @@ +# {% include 'template/license_header' %} + +from typing import Tuple +from typing_extensions import Annotated +from zenml import get_step_context +from zenml.model import ModelConfig +from zenml.models.model_models import ModelVersionResponseModel + + +def get_model_versions( + target_env: str, +) -> Tuple[ + Annotated[ModelVersionResponseModel, "latest_version"], + Annotated[ModelVersionResponseModel, "current_version"], +]: + """Get latest and currently promoted model versions from Model Control Plane. + + Args: + target_env: Target stage to search for currently promoted version + + Returns: + Latest and currently promoted model versions from the Model Control Plane + """ + latest_version = get_step_context().model_config._get_model_version() + try: + current_version = ModelConfig( + name=latest_version.model.name, version=target_env + )._get_model_version() + except KeyError: + current_version = latest_version + + return latest_version, current_version + + +def get_model_registry_version(model_version: ModelVersionResponseModel): + """Get model version in model registry from metadata of a model in the Model Control Plane. + + Args: + model_version: the Model Control Plane version response + """ + return ( + model_version.get_model_object("model").metadata["model_registry_version"].value + ) diff --git a/template/utils/promote_in_model_registry.py b/template/utils/promote_in_model_registry.py new file mode 100644 index 0000000..aedf995 --- /dev/null +++ b/template/utils/promote_in_model_registry.py @@ -0,0 +1,35 @@ +# {% include 'template/license_header' %} + + +from zenml.client import Client +from zenml.logger import get_logger +from zenml.model_registries.base_model_registry import ModelVersionStage + +logger = get_logger(__name__) + + +def promote_in_model_registry( + latest_version: str, current_version: str, model_name: str, target_env: str +): + """Promote model version in model registry to a given stage. + + Args: + latest_version: version to be promoted + current_version: currently promoted version + model_name: name of the model in registry + target_env: stage for promotion + """ + model_registry = Client().active_stack.model_registry + if latest_version != current_version: + model_registry.update_model_version( + name=model_name, + version=current_version, + stage=ModelVersionStage(ModelVersionStage.ARCHIVED), + metadata={}, + ) + model_registry.update_model_version( + name=model_name, + version=latest_version, + stage=ModelVersionStage(target_env), + metadata={}, + ) diff --git a/tests/test_template.py b/tests/test_template.py index ca3429b..fc6f836 100644 --- a/tests/test_template.py +++ b/tests/test_template.py @@ -78,7 +78,7 @@ def generate_and_run_project( # MLFlow Deployer not supported on Windows # MLFlow `service daemon is not running` error on MacOS - if platform.system().lower() not in ["windows", "macos", "darwin"]: + if platform.system().lower() not in ["windows"]: # run the project call = [sys.executable, "run.py"]