What would you need to get a quick understanding of the ZenML framework and start building your ML pipelines? The answer is a comprehensive project template to cover major use cases of ZenML: a collection of steps and pipelines and, to top it all off, a simple but useful CLI. This is exactly what the ZenML templates are all about.
This project template is a great starting point for anyone looking to deepen their knowledge of ZenML. It consists of three pipelines with the following high-level setup:
All pipelines are leveraging the Model Control Plane to bring all parts together - the training pipeline creates and promotes a new Model Control Plane version with a trained model object in it, deployment pipeline uses the inference Model Control Plane version (the one promoted during training) to create a deployment service and inference pipeline using deployment service from the inference Model Control Plane version and store back new set of predictions as a versioned data artifact for future use. This makes those pipelines closely connected while ensuring that only quality-assured Model Control Plane versions are used to produce predictions delivered to stakeholders.
- [CT] Training
- Load, split, and preprocess the training dataset
- Search for an optimal model object architecture and tune its hyperparameters
- Train the model object and evaluate its performance on the holdout set
- Compare a recently trained model object with one promoted earlier
- If a recently trained model object performs better - stage it as a new inference model object in model registry
- On success of the current model object - stage newly created Model Control Plane version as the one used for inference
- [CD] Deployment
- Deploy a new prediction service based on the model object connected to the inference Model Control Plane version.
- [CD] Batch Inference
- Load the inference dataset and preprocess it reusing object fitted during training
- Perform data drift analysis reusing training dataset of the inference Model Control Plane version as a reference
- Run predictions using a model object from the inference Model Control Plane version
- Store predictions as an versioned artifact and link it to the inference Model Control Plane version
It showcases the core ZenML concepts for supervised ML with batch predictions:
- designing ZenML pipeline steps
- using step parameterization and step caching to design flexible and reusable steps
- using custom data types for your artifacts and writing materializers for them
- constructing and running a ZenML pipeline
- usage of ZenML Model Control Plane
- best practices for implementing and running reproducible and reliable ML pipelines with ZenML
In addition to that, the entire project is implemented with the scikit-learn library and showcases how to use ZenML with a popular ML framework. It makes heavy use of the tabular datasets and classification models that scikit-learn provides, but the concepts and patterns it showcases apply to any other ML framework.
Parameter | Description | Default |
---|---|---|
Name | The name of the person/entity holding the copyright | ZenML GmbH |
The email of the person/entity holding the copyright | [email protected] | |
Project Name | Short name for your project | ZenML E2E project |
Project Version | The version of your project | 0.0.1 |
Project License | The license under which your project will be released (one of Apache Software License 2.0 , MIT license , BSD license , ISC license , GNU General Public License v3 and Not open source ) |
Apache Software License 2.0 |
Technical product name | The technical name to prefix all tech assets (pipelines, models, etc.) | e2e_use_case |
Target environment | The target environment for deployments/promotions (one of staging , production ) |
staging |
Use hyperparameter tuning | Whether to use hyperparameter tuning or not | yes |
Use metric-based promotion | Whether to compare metric of interest to make model version promotion | yes |
Use data quality checks | Whether to use data quality checks based on Evidently report to assess data before inference | yes |
Notifications on failure | Whether to notify about pipelines failures | yes |
Notifications on success | Whether to notify about pipelines successes | no |
Remote ZenML Server URL | Optional URL of a remote ZenML server for support scripts | - |
First, to use the templates, you need to have Zenml and its templates
extras installed:
pip install zenml[templates]
Now you can generate a project from one of the existing templates by using the --template
flag with the zenml init
command:
zenml init --template <short_name_of_template>
# example: zenml init --template e2e_batch
Running the command above will result in input prompts being shown to you. If you would like to rely on default values for the ZenML project template - you can add --template-with-defaults
to the same command, like this:
zenml init --template <short_name_of_template> --template-with-defaults
# example: zenml init --template e2e_batch --template-with-defaults
We will be going section by section diving into implementation details and sharing tips and best practices along this journey.
Training pipeline is designed to create a new Model Control Plane version and promote it to inference stage upon successfully passing the quality assurance at the end of the pipeline. This ensures that we always infer only on quality-assured Model Control Plane version and provides a seamless integration of required artifacts of this Model Control Plane version later on inference runs.
This is achieved by providing this configuration in train_config.yaml
used to configure our pipeline:
model:
name: your_product_name
Usually at the very beginning of every training pipeline developers are acquiring data to work with in later stages. In this example, we are using the Breast Cancer Dataset to showcase steps but avoid high computational costs.
The first data_loader
step is downloading data, which is passed to the train_data_splitter
step responsible for splitting into train and test to avoid target leakage on data cleaning. The next train_data_preprocess
step is preparing a sklearn.Pipeline
object based on the training dataset and applying it also on the testing set to form ready-to-use datasets.
We also output preprocess_pipeline
as an output artifact from train_data_preprocess
- it will be passed into the inference pipeline later on, to prepare the inference data using the same fitted pipeline from training. Sklearn Pipeline
comes really handy to perform consistent repeatable data manipulations on top of pandas DataFrame
or similar structures.
To ensure the high quality of ML models many ML Engineers go for automated hyperparameter tuning or even automated model architecture search. In this example, we are using prepared data from ETL to spin up a search of the best model parameters for different architectures in parallel.
To create parallel processing of computationally expensive operations we use a loop over predefined potential architectures and respective parameters search grid and create one step for each candidate. Inside each hyperparameter tuning step instance, we run random search cross-validation to find the best parameters and after that evaluate the result using the metric of interest (accuracy_score in this example). We attach a computed metric to the output artifact as metadata to be used later in hp_tuning_select_best_model
.
Code snippet π»
from zenml import log_metadata
score = accuracy_score(y_tst, y_pred)
# log score along with output artifact as metadata
log_metadata(
metadata={"metric": float(score)},
artifact_name="hp_result",
)
After the steps are executed we need to collect results (one best model per each search step) in a hp_tuning_select_best_model
step to define the final winner and pass it to training. We use the Model Control Plane capabilities to pull correct artifacts from previous steps and fetch their metadata for final evaluation before actual training.
Code snippet π»
from zenml import get_step_context
model = get_step_context().model
best_model = None
best_metric = -1
# consume artifacts attached to current model version in Model Control Plane
for step_name in step_names:
hp_output = model.get_data_artifact(
step_name=step_name, name="hp_result"
)
model: ClassifierMixin = hp_output.load()
# fetch metadata we attached earlier
metric = float(hp_output.run_metadata["metric"])
if best_model is None or best_metric < metric:
best_model = model
To ensure that collection goes smoothly and in full we use an after
statement populated with all search steps names, so the selector job will wait for the completion of all searches.
You can find more information about the current state of Hyperparameter Tuning using ZenML in the documentation.
Having the best model architecture and its hyperparameters defined in the previous section makes it possible to train a quality model object. Also, model training is the right place to bring an Experiment Tracker into the picture - we will log all metrics and model object itself into the Experiment Tracker, so we can register our model object in a Model Registry and pass it down to a Model Deployer easily and traceable. We will use information from the active stack to make the implementation agnostic of the underlying infrastructure. To make the most of the Model Control Plane we additionally annotate the output model object as a Model Artifact, and by doing so it will be properly categorized for future use and get additional model object-specific features.
Code snippet π»
from zenml import ArtifactConfig
experiment_tracker = Client().active_stack.experiment_tracker
@step(experiment_tracker=experiment_tracker.name)
def model_trainer(
...
) -> Annotated[
ClassifierMixin, ArtifactConfig(name="model", is_model_artifact=True)
]:
...
To notify maintainers of our Model Control Plane model about failures or successful completion of a pipeline we use the active stack's Alerter component. For failures it is convenient to use pipeline hook on_failure
and for successes, a step notifying about it added as a last step of the pipeline comes in handy.
Code snippet π»
alerter = Client().active_stack.alerter
def notify_on_failure() -> None:
alerter.post(message=build_message(status="failed"))
@step(enable_cache=False)
def notify_on_success() -> None:
alerter.post(message=build_message(status="succeeded"))
@pipeline(on_failure=notify_on_failure)
def e2e_example_training(...):
...
promote_with_metric_compare(...)
notify_on_success(after=["promote_with_metric_compare"])
Once the model object is trained and evaluated on meeting basic quality standards, we would like to understand whether it is good enough to beat the existing model object used in inference. This is a very important step, as promoting a weak model object as inference might have a huge negative impact.
In this example, we are implementing promotion based on metric comparison to decide on the spot and avoid more complex approaches like Champion/Challengers shadow deployments. In other projects, other promotion techniques and strategies can be used.
To achieve this we would retrieve the model version from the Model Control Plane: latest (the one we just trained) and current (the one having a proper tag). Next, we need to deploy both model objects using Model Deployer and run predictions on the testing set for both of them. Next, we select which one of the model registry versions has a better metric value. If the newly trained model is performing better we promote it to the inference stage in the Model Control Plane.
By doing so we ensure that the best-performing version will be used for inference later on and ensure seamless integration of relevant artifacts from the training pipeline in the batch inference pipeline.
The Deployment pipeline is designed to run with inference Model Control Plane version context. This ensures that we always infer only on quality-assured Model Control Plane version and provide seamless integration of required artifacts created during training of this Model Control Plane version.
This is achieved by providing this configuration in deployer_config.yaml
used to configure our pipeline:
model:
name: your_product_name
version: production
The deployment pipeline takes the model object trained earlier from the inference Model Control Plane version and produces a prediction service, which can be used by external tools or by another pipeline. In this case it will be used by a batch prediction pipeline later on. Prepared prediction service is linked to the same inference Model Control Plane version.
NOTE: In this template a prediction service is only created for local orchestrators, but you can redefine step definition to fit the needs of your infrastructure.
The Batch Inference pipeline is designed to run with inference Model Control Plane version context. This ensures that we always infer only on quality-assured Model Control Plane version and provide seamless integration of required artifacts created during training of this Model Control Plane version.
This is achieved by providing this configuration in inference_config.yaml
used to configure our pipeline:
model:
name: your_product_name
version: production
The process of loading data is similar to training, even the same step function is used, but with the is_inference
flag.
But inference flow has an important difference - there is no need to fit preprocessing sklearn Pipeline
, rather we need to reuse one fitted during training on the train set, to ensure that the model object gets the expected input. To do so we will use the Model interface with lookup by artifact name inside a model context to get the preprocessing pipeline fitted during the quality-assured training run. This is possible since we configured the batch inference pipeline to run inside a Model Control Plane version context.
Code snippet π»
model = get_pipeline_context().model
########## ETL stage ##########
df_inference, target = data_loader(is_inference=True)
df_inference = inference_data_preprocessor(
dataset_inf=df_inference,
# this fetches artifact using Model Control Plane
preprocess_pipeline=model.get_artifact("preprocess_pipeline"),
target=target,
)
In the drift reporting stage, we will use standard step evidently_report_step
to build Evidently report to assess certain data quality metrics. evidently_report_step
has a number of options, but for this example, we will build only DataQualityPreset
metrics preset to get a number of NA values in reference and current datasets.
We pass dataset_trn
from the training pipeline as a reference_dataset
here. To do so we will use the Model interface with lookup by artifact name inside a model context to get the training dataset used during quality-assured training run. This is possible since we configured the batch inference pipeline to run inside a Model Control Plane version context.
After the report is built we execute another quality gate using the drift_quality_gate
step, which assesses if a significant drift in the NA count is observed. If so, execution is stopped with an exception.
You can follow Data Validators docs to get more inspiration on how and when to use drift detection in your pipelines.
As a last step concluding all work done so far, we will calculate predictions on the inference dataset and persist them in Artifact Store attached to the current inference model version of the Model Control Plane for reuse and observability.
We will leverage a prepared predictions service called mlflow_deployment
linked to the inference model version of the Model Control Plane to run .predict()
and to put predictions as an output of the predictions step, so it is automatically stored in the Artifact Store and linked to the Model Control Plane model version as a versioned artifact link with zero effort.
NOTE: On non-local orchestrators a `model` artifact will be loaded into memory to run predictions directly. You can adapt this part to your needs.
Code snippet π»
@step
def inference_predict(
dataset_inf: pd.DataFrame,
) -> Annotated[pd.Series, "predictions"]:
model = get_step_context().model
# get predictor
predictor_service: Optional[MLFlowDeploymentService] = model.get_endpoint_artifact(
"mlflow_deployment"
).load()
if predictor_service is not None:
# run prediction from service
predictions = predictor_service.predict(request=dataset_inf)
else:
# run prediction from memory
predictor = model.get_model_artifact("model").load()
predictions = predictor.predict(dataset_inf)
predictions = pd.Series(predictions, name="predicted")
return predictions