From 80aaa6c47057087ccbde86c04ddb3487f66dd598 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 19 Oct 2023 10:05:22 +0200 Subject: [PATCH] YAML based config (#14) * fix ExternalArtifact behavior * update zenml-ref * fix ref name * YAML based template configuration * update docs * fix config * verbose tests output * verbose tests output * no fail fast * verbose tests output * bump timeouts for mlflow * try replace * testing inside python * restore test flow * update requirements * properly skip macos * revert ref --- .github/pull_request_template.md | 16 ++++ .github/workflows/ci.yml | 1 + README.md | 2 +- requirements.txt | 3 +- template/artifacts/__init__.py | 1 - template/artifacts/materializer.py | 89 ----------------- template/artifacts/model_metadata.py | 62 ------------ template/config.py | 87 ----------------- template/config.yaml | 78 +++++++++++++++ template/main.py | 95 +++++++++++++++++++ template/pipelines/batch_inference.py | 17 ++-- template/pipelines/training.py | 48 ++++++---- template/run.py | 24 +++-- .../inference_get_current_version.py | 12 +-- .../steps/promotion/promote_get_versions.py | 11 ++- ...mote_metric_compare_promoter.py{% endif %} | 12 +-- ...e_promotion %}promote_latest.py{% endif %} | 15 ++- template/steps/training/model_trainer.py | 12 +-- .../hp_tuning_select_best_model.py | 21 ++-- .../hp_tuning_single_search.py | 41 +++++--- template/utils/get_model_from_config.py | 17 ++++ tests/test_template.py | 11 ++- 22 files changed, 328 insertions(+), 347 deletions(-) create mode 100644 .github/pull_request_template.md delete mode 100644 template/artifacts/__init__.py delete mode 100644 template/artifacts/materializer.py delete mode 100644 template/artifacts/model_metadata.py delete mode 100644 template/config.py create mode 100644 template/config.yaml create mode 100644 template/main.py create mode 100644 template/utils/get_model_from_config.py diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..ae71ac2 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,16 @@ +## Describe changes +I implemented/fixed _ to achieve _. + +## Pre-requisites +Please ensure you have done the following: +- [ ] If my change requires a change to docs, I have updated the documentation accordingly. +- [ ] I have added tests to cover my changes. +- [ ] I have updated `ref-zenml` in `.github/workflows/ci.yml` accordingly (if you don't know - `main` would be a solid choice) + +## Types of changes + +- [ ] Bug fix (non-breaking change which fixes an issue) +- [ ] New feature (non-breaking change which adds functionality) +- [ ] Breaking change (fix or feature that would cause existing functionality to change) +- [ ] Other (add details above) + diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index abb636c..6fe0ba6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,6 +18,7 @@ jobs: run-tests: runs-on: ${{ matrix.os }} strategy: + fail-fast: false matrix: stack-name: [local] os: [windows-latest, ubuntu-latest, macos-latest] diff --git a/README.md b/README.md index 1f7b41c..3f7c7f3 100644 --- a/README.md +++ b/README.md @@ -209,7 +209,7 @@ df_inference, target = data_loader(is_inference=True) df_inference = inference_data_preprocessor( dataset_inf=df_inference, preprocess_pipeline=ExternalArtifact( - pipeline_name=MetaConfig.pipeline_name_training, + pipeline_name="your_product_name_training", artifact_name="preprocess_pipeline", ), target=target, diff --git a/requirements.txt b/requirements.txt index 3f1435b..96c96d8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1 @@ -copier -jinja2-time +zenml[templates] diff --git a/template/artifacts/__init__.py b/template/artifacts/__init__.py deleted file mode 100644 index 4bc11e5..0000000 --- a/template/artifacts/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# {% include 'template/license_header' %} diff --git a/template/artifacts/materializer.py b/template/artifacts/materializer.py deleted file mode 100644 index 40c89a3..0000000 --- a/template/artifacts/materializer.py +++ /dev/null @@ -1,89 +0,0 @@ -# {% include 'template/license_header' %} - - -import json -import os -from typing import Type - -from zenml.enums import ArtifactType -from zenml.io import fileio -from zenml.materializers.base_materializer import BaseMaterializer - -from artifacts.model_metadata import ModelMetadata - - -class ModelMetadataMaterializer(BaseMaterializer): - ASSOCIATED_TYPES = (ModelMetadata,) - ASSOCIATED_ARTIFACT_TYPE = ArtifactType.STATISTICS - - def load(self, data_type: Type[ModelMetadata]) -> ModelMetadata: - """Read from artifact store. - - Args: - data_type: What type the artifact data should be loaded as. - - Raises: - ValueError: on deserialization issue - - Returns: - Read artifact. - """ - super().load(data_type) - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - import sklearn.ensemble - import sklearn.linear_model - import sklearn.tree - - modules = [sklearn.ensemble, sklearn.linear_model, sklearn.tree] - - with fileio.open(os.path.join(self.uri, "data.json"), "r") as f: - data_json = json.loads(f.read()) - class_name = data_json["model_class"] - cls = None - for module in modules: - if cls := getattr(module, class_name, None): - break - if cls is None: - raise ValueError( - f"Cannot deserialize `{class_name}` using {self.__class__.__name__}. " - f"Only classes from modules {[m.__name__ for m in modules]} " - "are supported" - ) - data = ModelMetadata(cls) - if "search_grid" in data_json: - data.search_grid = data_json["search_grid"] - if "params" in data_json: - data.params = data_json["params"] - if "metric" in data_json: - data.metric = data_json["metric"] - ### YOUR CODE ENDS HERE ### - - return data - - def save(self, data: ModelMetadata) -> None: - """Write to artifact store. - - Args: - data: The data of the artifact to save. - """ - super().save(data) - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - # Dump the model metadata directly into the artifact store as a JSON file - data_json = dict() - with fileio.open(os.path.join(self.uri, "data.json"), "w") as f: - data_json["model_class"] = data.model_class.__name__ - if data.search_grid: - data_json["search_grid"] = {} - for k, v in data.search_grid.items(): - if type(v) == range: - data_json["search_grid"][k] = list(v) - else: - data_json["search_grid"][k] = v - if data.params: - data_json["params"] = data.params - if data.metric: - data_json["metric"] = data.metric - f.write(json.dumps(data_json)) - ### YOUR CODE ENDS HERE ### diff --git a/template/artifacts/model_metadata.py b/template/artifacts/model_metadata.py deleted file mode 100644 index d3e6ea6..0000000 --- a/template/artifacts/model_metadata.py +++ /dev/null @@ -1,62 +0,0 @@ -# {% include 'template/license_header' %} - - -from typing import Any, Dict - -from sklearn.base import ClassifierMixin - - -class ModelMetadata: - """A custom artifact that stores model metadata. - - A model metadata object gathers together information that is collected - about the model being trained in a training pipeline run. This data type - is used for one of the artifacts returned by the model evaluation step. - - This is an example of a *custom artifact data type*: a type returned by - one of the pipeline steps that isn't natively supported by the ZenML - framework. Custom artifact data types are a common occurrence in ZenML, - usually encountered in one of the following circumstances: - - - you use a third party library that is not covered as a ZenML integration - and you model one or more step artifacts from the data types provided by - this library (e.g. datasets, models, data validation profiles, model - evaluation results/reports etc.) - - you need to use one of your own data types as a step artifact and it is - not one of the basic Python artifact data types supported by the ZenML - framework (e.g. str, int, float, dictionaries, lists, etc.) - - you want to extend one of the artifact data types already natively - supported by ZenML (e.g. pandas.DataFrame or sklearn.ClassifierMixin) - to customize it with your own data and/or behavior. - - In all above cases, the ZenML framework lacks one very important piece of - information: it doesn't "know" how to convert the data into a format that - can be saved in the artifact store (e.g. on a filesystem or persistent - storage service like S3 or GCS). Saving and loading artifacts from the - artifact store is something called "materialization" in ZenML terms and - you need to provide this missing information in the form of a custom - materializer - a class that implements loading/saving artifacts from/to - the artifact store. Take a look at the `materializers` folder to see how a - custom materializer is implemented for this artifact data type. - - More information about custom step artifact data types and ZenML - materializers is available in the docs: - - https://docs.zenml.io/user-guide/advanced-guide/artifact-management/handle-custom-data-types - - """ - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - def __init__( - self, - model_class: ClassifierMixin, - search_grid: Dict[str, Any] = None, - params: Dict[str, Any] = None, - metric: float = None, - ) -> None: - self.model_class = model_class - self.search_grid = search_grid - self.params = params - self.metric = metric - - ### YOUR CODE ENDS HERE ### diff --git a/template/config.py b/template/config.py deleted file mode 100644 index f08b7fc..0000000 --- a/template/config.py +++ /dev/null @@ -1,87 +0,0 @@ -# {% include 'template/license_header' %} - - -from artifacts.model_metadata import ModelMetadata -from pydantic import BaseConfig -from sklearn.ensemble import RandomForestClassifier -from sklearn.tree import DecisionTreeClassifier -from zenml.config import DockerSettings -from zenml.integrations.constants import ( - AWS, -{%- if data_quality_checks %} - EVIDENTLY, -{%- endif %} - KUBEFLOW, - KUBERNETES, - MLFLOW, - SKLEARN, - SLACK, -) -from zenml.model_registries.base_model_registry import ModelVersionStage - -PIPELINE_SETTINGS = dict( - docker=DockerSettings( - required_integrations=[ - AWS, -{%- if data_quality_checks %} - EVIDENTLY, -{%- endif %} - KUBEFLOW, - KUBERNETES, - MLFLOW, - SKLEARN, - SLACK, - ], - ) -) - -DEFAULT_PIPELINE_EXTRAS = dict( - notify_on_success={{notify_on_successes}}, - notify_on_failure={{notify_on_failures}} -) - - -class MetaConfig(BaseConfig): - pipeline_name_training = "{{product_name}}_training" - pipeline_name_batch_inference = "{{product_name}}_batch_inference" - mlflow_model_name = "{{product_name}}_model" -{%- if target_environment == 'production' %} - target_env = ModelVersionStage.PRODUCTION -{%- else %} - target_env = ModelVersionStage.STAGING -{%- endif %} - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### -{%- if hyperparameters_tuning %} - # This set contains all the models that you want to evaluate - # during hyperparameter tuning stage. - model_search_space = { - ModelMetadata( - RandomForestClassifier, - search_grid=dict( - criterion=["gini", "entropy"], - max_depth=[2, 4, 6, 8, 10, 12], - min_samples_leaf=range(1, 10), - n_estimators=range(50, 500, 25), - ), - ), - ModelMetadata( - DecisionTreeClassifier, - search_grid=dict( - criterion=["gini", "entropy"], - max_depth=[2, 4, 6, 8, 10, 12], - min_samples_leaf=range(1, 10), - ), - ), - } -{%- else %} - # This model configuration will be used for the training stage. - model_configuration = ModelMetadata( - DecisionTreeClassifier, - params=dict( - criterion="gini", - max_depth=5, - min_samples_leaf=3, - ), - ) -{%- endif %} diff --git a/template/config.yaml b/template/config.yaml new file mode 100644 index 0000000..bd4c34c --- /dev/null +++ b/template/config.yaml @@ -0,0 +1,78 @@ +# {% include 'template/license_header' %} + +settings: + docker: + requirements: + - aws +{%- if data_quality_checks %} + - evidently +{%- endif %} + - kubeflow + - kubernetes + - mlflow + - sklearn + - slack +extra: + mlflow_model_name: e2e_use_case_model +{%- if target_environment == 'production' %} + target_env: Production +{%- else %} + target_env: Staging +{%- endif %} + notify_on_success: False + notify_on_failure: True +{%- if hyperparameters_tuning %} + # This set contains all the models that you want to evaluate + # during hyperparameter tuning stage. + model_search_space: + random_forest: + model_package: sklearn.ensemble + model_class: RandomForestClassifier + search_grid: + criterion: + - gini + - entropy + max_depth: + - 2 + - 4 + - 6 + - 8 + - 10 + - 12 + min_samples_leaf: + range: + start: 1 + end: 10 + n_estimators: + range: + start: 50 + end: 500 + step: 25 + decision_tree: + model_package: sklearn.tree + model_class: DecisionTreeClassifier + search_grid: + criterion: + - gini + - entropy + max_depth: + - 2 + - 4 + - 6 + - 8 + - 10 + - 12 + min_samples_leaf: + range: + start: 1 + end: 10 +{%- else %} + # This model configuration will be used for the training stage. + model_configuration: + model_package: sklearn.tree + model_class: DecisionTreeClassifier + params: + criterion: gini + max_depth: 5 + min_samples_leaf: 3 +{%- endif %} \ No newline at end of file diff --git a/template/main.py b/template/main.py new file mode 100644 index 0000000..d827559 --- /dev/null +++ b/template/main.py @@ -0,0 +1,95 @@ +# {% include 'template/license_header' %} + +from datetime import datetime as dt +import os +from typing import Optional + +from zenml.artifacts.external_artifact import ExternalArtifact +from zenml.logger import get_logger + +from pipelines import {{product_name}}_batch_inference, {{product_name}}_training + +logger = get_logger(__name__) + + +def main( + no_cache: bool = False, + no_drop_na: bool = False, + no_normalize: bool = False, + drop_columns: Optional[str] = None, + test_size: float = 0.2, + min_train_accuracy: float = 0.8, + min_test_accuracy: float = 0.8, + fail_on_accuracy_quality_gates: bool = False, + only_inference: bool = False, +): + """Main entry point for the pipeline execution. + + This entrypoint is where everything comes together: + + * configuring pipeline with the required parameters + (some of which may come from command line arguments) + * launching the pipeline + + Args: + no_cache: If `True` cache will be disabled. + no_drop_na: If `True` NA values will not be dropped from the dataset. + no_normalize: If `True` normalization will not be done for the dataset. + drop_columns: List of comma-separated names of columns to drop from the dataset. + test_size: Percentage of records from the training dataset to go into the test dataset. + min_train_accuracy: Minimum acceptable accuracy on the train set. + min_test_accuracy: Minimum acceptable accuracy on the test set. + fail_on_accuracy_quality_gates: If `True` and any of minimal accuracy + thresholds are violated - the pipeline will fail. If `False` thresholds will + not affect the pipeline. + only_inference: If `True` only inference pipeline will be triggered. + """ + + # Run a pipeline with the required parameters. This executes + # all steps in the pipeline in the correct order using the orchestrator + # stack component that is configured in your active ZenML stack. + pipeline_args = { + "config_path":os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "config.yaml", + ) + } + if no_cache: + pipeline_args["enable_cache"] = False + + if not only_inference: + # Execute Training Pipeline + run_args_train = { + "drop_na": not no_drop_na, + "normalize": not no_normalize, + "random_seed": 42, + "test_size": test_size, + "min_train_accuracy": min_train_accuracy, + "min_test_accuracy": min_test_accuracy, + "fail_on_accuracy_quality_gates": fail_on_accuracy_quality_gates, + } + if drop_columns: + run_args_train["drop_columns"] = drop_columns.split(",") + + pipeline_args[ + "run_name" + ] = f"{{product_name}}_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + {{product_name}}_training.with_options(**pipeline_args)(**run_args_train) + logger.info("Training pipeline finished successfully!") + + # Execute Batch Inference Pipeline + run_args_inference = {} + pipeline_args[ + "run_name" + ] = f"{{product_name}}_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + {{product_name}}_batch_inference.with_options(**pipeline_args)(**run_args_inference) + + artifact = ExternalArtifact( + pipeline_name="{{product_name}}_batch_inference", + artifact_name="predictions", + ) + logger.info( + "Batch inference pipeline finished successfully! " + "You can find predictions in Artifact Store using ID: " + f"`{str(artifact.get_artifact_id())}`." + ) \ No newline at end of file diff --git a/template/pipelines/batch_inference.py b/template/pipelines/batch_inference.py index 8f9c26c..e777ee6 100644 --- a/template/pipelines/batch_inference.py +++ b/template/pipelines/batch_inference.py @@ -1,7 +1,6 @@ # {% include 'template/license_header' %} -from config import DEFAULT_PIPELINE_EXTRAS, PIPELINE_SETTINGS, MetaConfig from steps import ( data_loader, {%- if data_quality_checks %} @@ -13,7 +12,7 @@ notify_on_failure, notify_on_success, ) -from zenml import pipeline +from zenml import get_pipeline_context, pipeline from zenml.integrations.evidently.metrics import EvidentlyMetricConfig from zenml.integrations.evidently.steps import evidently_report_step from zenml.integrations.mlflow.steps.mlflow_deployer import ( @@ -25,11 +24,7 @@ logger = get_logger(__name__) -@pipeline( - settings=PIPELINE_SETTINGS, - on_failure=notify_on_failure, - extra=DEFAULT_PIPELINE_EXTRAS, -) +@pipeline(on_failure=notify_on_failure) def {{product_name}}_batch_inference(): """ Model batch inference pipeline. @@ -45,7 +40,7 @@ def {{product_name}}_batch_inference(): df_inference = inference_data_preprocessor( dataset_inf=df_inference, preprocess_pipeline=ExternalArtifact( - pipeline_name=MetaConfig.pipeline_name_training, + pipeline_name="{{product_name}}_training", artifact_name="preprocess_pipeline", ), target=target, @@ -55,7 +50,7 @@ def {{product_name}}_batch_inference(): ########## DataQuality stage ########## report, _ = evidently_report_step( reference_dataset=ExternalArtifact( - pipeline_name=MetaConfig.pipeline_name_training, + pipeline_name="{{product_name}}_training", artifact_name="dataset_trn", ), comparison_dataset=df_inference, @@ -69,9 +64,9 @@ def {{product_name}}_batch_inference(): ########## Inference stage ########## registry_model_version = inference_get_current_version() deployment_service = mlflow_model_registry_deployer_step( - registry_model_name=MetaConfig.mlflow_model_name, + registry_model_name=get_pipeline_context().extra["mlflow_model_name"], registry_model_version=registry_model_version, - replace_existing=False, + replace_existing=True, ) inference_predict( deployment_service=deployment_service, diff --git a/template/pipelines/training.py b/template/pipelines/training.py index d80083d..bef85bf 100644 --- a/template/pipelines/training.py +++ b/template/pipelines/training.py @@ -3,8 +3,6 @@ from typing import List, Optional -from artifacts.materializer import ModelMetadataMaterializer -from config import DEFAULT_PIPELINE_EXTRAS, PIPELINE_SETTINGS, MetaConfig from steps import ( data_loader, {%- if hyperparameters_tuning %} @@ -25,23 +23,24 @@ train_data_preprocessor, train_data_splitter, ) -from zenml import pipeline +from zenml import pipeline, get_pipeline_context from zenml.integrations.mlflow.steps.mlflow_deployer import ( mlflow_model_registry_deployer_step, ) from zenml.integrations.mlflow.steps.mlflow_registry import mlflow_register_model_step from zenml.logger import get_logger +{%- if hyperparameters_tuning %} + +{%- else %} from zenml.artifacts.external_artifact import ExternalArtifact +from utils.get_model_from_config import get_model_from_config +{%- endif %} logger = get_logger(__name__) -@pipeline( - settings=PIPELINE_SETTINGS, - on_failure=notify_on_failure, - extra=DEFAULT_PIPELINE_EXTRAS, -) +@pipeline(on_failure=notify_on_failure) def {{product_name}}_training( test_size: float = 0.2, drop_na: Optional[bool] = None, @@ -74,6 +73,7 @@ def {{product_name}}_training( ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### # Link all the steps together by calling them and passing the output # of one step as the input of the next step. + pipeline_extra = get_pipeline_context().extra ########## ETL stage ########## raw_data, target = data_loader() dataset_trn, dataset_tst = train_data_splitter( @@ -92,30 +92,36 @@ def {{product_name}}_training( ########## Hyperparameter tuning stage ########## after = [] search_steps_prefix = "hp_tuning_search_" - for i, model_search_configuration in enumerate(MetaConfig.model_search_space): - step_name = f"{search_steps_prefix}{i}" + for config_name,model_search_configuration in pipeline_extra["model_search_space"].items(): + step_name = f"{search_steps_prefix}{config_name}" hp_tuning_single_search( - model_metadata=ExternalArtifact( - value=model_search_configuration, - ), id=step_name, + model_package = model_search_configuration["model_package"], + model_class = model_search_configuration["model_class"], + search_grid = model_search_configuration["search_grid"], dataset_trn=dataset_trn, dataset_tst=dataset_tst, target=target, ) after.append(step_name) - best_model_config = hp_tuning_select_best_model( + best_model = hp_tuning_select_best_model( search_steps_prefix=search_steps_prefix, after=after ) +{%- else %} + model_configuration = pipeline_extra["model_configuration"] + best_model = get_model_from_config( + model_package=model_configuration["model_package"], + model_class=model_configuration["model_class"], + )(**model_configuration["params"]) {%- endif %} ########## Training stage ########## model = model_trainer( dataset_trn=dataset_trn, {%- if hyperparameters_tuning %} - model_config=best_model_config, + model=best_model, {%- else %} - model_config=ExternalArtifact(value=MetaConfig.model_configuration, materializer=ModelMetadataMaterializer), + model=ExternalArtifact(value=best_model), {%- endif %} random_seed=random_seed, target=target, @@ -131,7 +137,7 @@ def {{product_name}}_training( ) mlflow_register_model_step( model, - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], ) ########## Promotion stage ########## @@ -141,9 +147,9 @@ def {{product_name}}_training( {%- if metric_compare_promotion %} latest_deployment = mlflow_model_registry_deployer_step( id="deploy_latest_model_version", - registry_model_name=MetaConfig.mlflow_model_name, + registry_model_name=pipeline_extra["mlflow_model_name"], registry_model_version=latest_version, - replace_existing=False, + replace_existing=True, ) latest_metric = promote_get_metric( id="get_metrics_latest_model_version", @@ -153,9 +159,9 @@ def {{product_name}}_training( current_deployment = mlflow_model_registry_deployer_step( id="deploy_current_model_version", - registry_model_name=MetaConfig.mlflow_model_name, + registry_model_name=pipeline_extra["mlflow_model_name"], registry_model_version=current_version, - replace_existing=False, + replace_existing=True, after=["get_metrics_latest_model_version"], ) current_metric = promote_get_metric( diff --git a/template/run.py b/template/run.py index 49fc089..4be9b6d 100644 --- a/template/run.py +++ b/template/run.py @@ -1,13 +1,14 @@ # {% include 'template/license_header' %} +import click +from datetime import datetime as dt +import os +from typing import Optional from zenml.artifacts.external_artifact import ExternalArtifact from zenml.logger import get_logger + from pipelines import {{product_name}}_batch_inference, {{product_name}}_training -from config import MetaConfig -import click -from typing import Optional -from datetime import datetime as dt logger = get_logger(__name__) @@ -139,7 +140,12 @@ def main( # Run a pipeline with the required parameters. This executes # all steps in the pipeline in the correct order using the orchestrator # stack component that is configured in your active ZenML stack. - pipeline_args = {} + pipeline_args = { + "config_path":os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "config.yaml", + ) + } if no_cache: pipeline_args["enable_cache"] = False @@ -159,7 +165,7 @@ def main( pipeline_args[ "run_name" - ] = f"{MetaConfig.pipeline_name_training}_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + ] = f"{{product_name}}_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" {{product_name}}_training.with_options(**pipeline_args)(**run_args_train) logger.info("Training pipeline finished successfully!") @@ -167,17 +173,17 @@ def main( run_args_inference = {} pipeline_args[ "run_name" - ] = f"{MetaConfig.pipeline_name_batch_inference}_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + ] = f"{{product_name}}_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" {{product_name}}_batch_inference.with_options(**pipeline_args)(**run_args_inference) artifact = ExternalArtifact( - pipeline_name=MetaConfig.pipeline_name_batch_inference, + pipeline_name="{{product_name}}_batch_inference", artifact_name="predictions", ) logger.info( "Batch inference pipeline finished successfully! " "You can find predictions in Artifact Store using ID: " - f"`{str(artifact.id)}`." + f"`{str(artifact.get_artifact_id())}`." ) diff --git a/template/steps/inference/inference_get_current_version.py b/template/steps/inference/inference_get_current_version.py index 0454d87..931f7b4 100644 --- a/template/steps/inference/inference_get_current_version.py +++ b/template/steps/inference/inference_get_current_version.py @@ -3,10 +3,10 @@ from typing_extensions import Annotated -from config import MetaConfig -from zenml import step +from zenml import step, get_step_context from zenml.client import Client from zenml.logger import get_logger +from zenml.model_registries.base_model_registry import ModelVersionStage logger = get_logger(__name__) @@ -22,13 +22,13 @@ def inference_get_current_version() -> Annotated[str, "model_version"]: """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - + pipeline_extra = get_step_context().pipeline_run.config.extra current_version = model_registry.list_model_versions( - name=MetaConfig.mlflow_model_name, - stage=MetaConfig.target_env, + name=pipeline_extra["mlflow_model_name"], + stage=ModelVersionStage(pipeline_extra["target_env"]), )[0].version logger.info( - f"Current model version in `{MetaConfig.target_env.value}` is `{current_version}`" + f"Current model version in `{pipeline_extra['target_env']}` is `{current_version}`" ) return current_version diff --git a/template/steps/promotion/promote_get_versions.py b/template/steps/promotion/promote_get_versions.py index 52d3529..bb5ff3c 100644 --- a/template/steps/promotion/promote_get_versions.py +++ b/template/steps/promotion/promote_get_versions.py @@ -4,10 +4,10 @@ from typing import Tuple from typing_extensions import Annotated -from config import MetaConfig -from zenml import step +from zenml import get_step_context, step from zenml.client import Client from zenml.logger import get_logger +from zenml.model_registries.base_model_registry import ModelVersionStage logger = get_logger(__name__) @@ -30,16 +30,17 @@ def promote_get_versions() -> ( """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + pipeline_extra = get_step_context().pipeline_run.config.extra none_versions = model_registry.list_model_versions( - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], stage=None, ) latest_versions = none_versions[0].version logger.info(f"Latest model version is {latest_versions}") target_versions = model_registry.list_model_versions( - name=MetaConfig.mlflow_model_name, - stage=MetaConfig.target_env, + name=pipeline_extra["mlflow_model_name"], + stage=ModelVersionStage(pipeline_extra["target_env"]), ) current_version = latest_versions if target_versions: diff --git a/template/steps/promotion/{% if metric_compare_promotion %}promote_metric_compare_promoter.py{% endif %} b/template/steps/promotion/{% if metric_compare_promotion %}promote_metric_compare_promoter.py{% endif %} index 045f1f7..d055885 100644 --- a/template/steps/promotion/{% if metric_compare_promotion %}promote_metric_compare_promoter.py{% endif %} +++ b/template/steps/promotion/{% if metric_compare_promotion %}promote_metric_compare_promoter.py{% endif %} @@ -1,8 +1,7 @@ # {% include 'template/license_header' %} -from config import MetaConfig -from zenml import step +from zenml import get_step_context, step from zenml.client import Client from zenml.logger import get_logger from zenml.model_registries.base_model_registry import ModelVersionStage @@ -46,6 +45,7 @@ def promote_metric_compare_promoter( """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + pipeline_extra = get_step_context().pipeline_run.config.extra should_promote = True if latest_version == current_version: @@ -69,18 +69,18 @@ def promote_metric_compare_promoter( if should_promote: if latest_version != current_version: model_registry.update_model_version( - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], version=current_version, stage=ModelVersionStage.ARCHIVED, ) model_registry.update_model_version( - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], version=latest_version, - stage=MetaConfig.target_env, + stage=ModelVersionStage(pipeline_extra["target_env"]), ) promoted_version = latest_version logger.info( - f"Current model version in `{MetaConfig.target_env.value}` is `{promoted_version}`" + f"Current model version in `{pipeline_extra['target_env']}` is `{promoted_version}`" ) ### YOUR CODE ENDS HERE ### diff --git a/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest.py{% endif %} b/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest.py{% endif %} index 6b940fe..9ec05bf 100644 --- a/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest.py{% endif %} +++ b/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest.py{% endif %} @@ -1,13 +1,11 @@ # {% include 'template/license_header' %} -from zenml import step +from zenml import get_step_context, step from zenml.client import Client from zenml.logger import get_logger from zenml.model_registries.base_model_registry import ModelVersionStage -from config import MetaConfig - logger = get_logger(__name__) model_registry = Client().active_stack.model_registry @@ -28,22 +26,23 @@ def promote_latest(latest_version:str, current_version:str): ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### logger.info(f"Promoting latest model version `{latest_version}`") + pipeline_extra = get_step_context().pipeline_run.config.extra if latest_version != current_version: model_registry.update_model_version( - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], version=current_version, - stage=ModelVersionStage.ARCHIVED, + stage=ModelVersionStage(ModelVersionStage.ARCHIVED), metadata={}, ) model_registry.update_model_version( - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], version=latest_version, - stage=MetaConfig.target_env, + stage=ModelVersionStage(pipeline_extra["target_env"]), metadata={}, ) promoted_version = latest_version logger.info( - f"Current model version in `{MetaConfig.target_env.value}` is `{promoted_version}`" + f"Current model version in `{pipeline_extra['target_env']}` is `{promoted_version}`" ) ### YOUR CODE ENDS HERE ### diff --git a/template/steps/training/model_trainer.py b/template/steps/training/model_trainer.py index b651fde..43eff16 100644 --- a/template/steps/training/model_trainer.py +++ b/template/steps/training/model_trainer.py @@ -5,7 +5,6 @@ import mlflow import pandas as pd -from artifacts.model_metadata import ModelMetadata from sklearn.base import ClassifierMixin from zenml import step from zenml.client import Client @@ -28,7 +27,7 @@ @step(experiment_tracker=experiment_tracker.name) def model_trainer( dataset_trn: pd.DataFrame, - model_config: ModelMetadata, + model: ClassifierMixin, target: str, random_seed: int = 42, ) -> Annotated[ClassifierMixin, "model"]: @@ -55,7 +54,7 @@ def model_trainer( Args: dataset_trn: The preprocessed train dataset. - model_config: `ModelMetadata` to train on + model: The model instance to train. target: Name of target columns in dataset. random_seed: Fixed seed of random generator. @@ -66,13 +65,6 @@ def model_trainer( ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### # Initialize the model with the hyperparameters indicated in the step # parameters and train it on the training set. - hyperparameters = model_config.params - model_class = model_config.model_class - if "random_seed" in model_class.__init__.__code__.co_varnames: - model = model_class(random_seed=random_seed, **hyperparameters) - else: - model = model_class(**hyperparameters) - logger.info(f"Training model {model}...") mlflow.sklearn.autolog() model.fit( diff --git a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py index 7af3fd0..0f0bcc3 100644 --- a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py +++ b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py @@ -3,8 +3,8 @@ from typing_extensions import Annotated -from artifacts.materializer import ModelMetadataMaterializer -from artifacts.model_metadata import ModelMetadata +from sklearn.base import ClassifierMixin + from zenml import get_step_context, step from zenml.client import Client from zenml.logger import get_logger @@ -12,10 +12,10 @@ logger = get_logger(__name__) -@step(output_materializers=ModelMetadataMaterializer) +@step def hp_tuning_select_best_model( search_steps_prefix: str, -) -> Annotated[ModelMetadata, "best_model"]: +) -> Annotated[ClassifierMixin, "best_model"]: """Find best model across all HP tuning attempts. This is an example of a model hyperparameter tuning step that takes @@ -33,12 +33,13 @@ def hp_tuning_select_best_model( run = Client().get_pipeline_run(run_name) best_model = None + best_metric = -1 for run_step_name, run_step in run.steps.items(): if run_step_name.startswith(search_steps_prefix): - for output_name, output in run_step.outputs.items(): - if output_name == "best_model": - model: ModelMetadata = output.load() - if best_model is None or best_model.metric < model.metric: - best_model = model + if "best_model" in run_step.outputs: + model: ClassifierMixin = run_step.outputs["best_model"].load() + metric: float = run_step.outputs["metric"].load() + if best_model is None or best_metric < metric: + best_model = model ### YOUR CODE ENDS HERE ### - return (best_model or ModelMetadata(None)) # for types compatibility + return best_model diff --git a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py index 10ade38..e246d4e 100644 --- a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py +++ b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py @@ -1,26 +1,30 @@ # {% include 'template/license_header' %} +from typing import Any, Dict, Tuple from typing_extensions import Annotated import pandas as pd -from artifacts.materializer import ModelMetadataMaterializer -from artifacts.model_metadata import ModelMetadata +from sklearn.base import ClassifierMixin from sklearn.metrics import accuracy_score from sklearn.model_selection import RandomizedSearchCV from zenml import step from zenml.logger import get_logger +from utils.get_model_from_config import get_model_from_config + logger = get_logger(__name__) -@step(output_materializers=ModelMetadataMaterializer) +@step def hp_tuning_single_search( - model_metadata: ModelMetadata, + model_package: str, + model_class: str, + search_grid: Dict[str, Any], dataset_trn: pd.DataFrame, dataset_tst: pd.DataFrame, target: str, -) -> Annotated[ModelMetadata, "best_model"]: +) -> Tuple[Annotated[ClassifierMixin, "best_model"],Annotated[float,"metric"]]: """Evaluate a trained model. This is an example of a model hyperparameter tuning step that takes @@ -35,7 +39,9 @@ def hp_tuning_single_search( https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines Args: - model_metadata: `ModelMetadata` to search + model_package: The package containing the model to use for hyperparameter tuning. + model_class: The class of the model to use for hyperparameter tuning. + search_grid: The hyperparameter search space. dataset_trn: The train dataset. dataset_tst: The test dataset. target: Name of target columns in dataset. @@ -43,6 +49,16 @@ def hp_tuning_single_search( Returns: The best possible model parameters for given config. """ + model_class = get_model_from_config(model_package, model_class) + + for search_key in search_grid: + if "range" in search_grid[search_key]: + search_grid[search_key] = range( + search_grid[search_key]["range"]["start"], + search_grid[search_key]["range"]["end"], + search_grid[search_key]["range"].get("step",1) + ) + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### X_trn = dataset_trn.drop(columns=[target]) @@ -50,23 +66,18 @@ def hp_tuning_single_search( X_tst = dataset_tst.drop(columns=[target]) y_tst = dataset_tst[target] logger.info("Running Hyperparameter tuning...") - best_model = {"class": None, "params": None, "metric": -1} cv = RandomizedSearchCV( - estimator=model_metadata.model_class(), - param_distributions=model_metadata.search_grid, + estimator=model_class(), + param_distributions=search_grid, cv=3, n_jobs=-1, n_iter=10, random_state=42, scoring="accuracy", + refit=True, ) cv.fit(X=X_trn, y=y_trn) y_pred = cv.predict(X_tst) score = accuracy_score(y_tst, y_pred) - best_model = ModelMetadata( - model_metadata.model_class, - params=cv.best_params_, - metric=score, - ) ### YOUR CODE ENDS HERE ### - return best_model + return cv.best_estimator_, score diff --git a/template/utils/get_model_from_config.py b/template/utils/get_model_from_config.py new file mode 100644 index 0000000..72444d8 --- /dev/null +++ b/template/utils/get_model_from_config.py @@ -0,0 +1,17 @@ +# {% include 'template/license_header' %} + + +from sklearn.base import ClassifierMixin + + +def get_model_from_config(model_package: str, model_class: str) -> ClassifierMixin: + if model_package == "sklearn.ensemble": + import sklearn.ensemble as package + model_class = getattr(package, model_class) + elif model_package == "sklearn.tree": + import sklearn.tree as package + model_class = getattr(package, model_class) + else: + raise ValueError(f"Unsupported model package: {model_package}") + + return model_class \ No newline at end of file diff --git a/tests/test_template.py b/tests/test_template.py index 7f43d43..55ae0ea 100644 --- a/tests/test_template.py +++ b/tests/test_template.py @@ -78,19 +78,22 @@ def generate_and_run_project( worker.run_copy() # MLFlow Deployer not supported on Windows - if platform.system().lower()!="windows": + # MLFlow `service daemon is not running` error on MacOS + if platform.system().lower() not in ["windows", "macos", "darwin"]: # run the project call = [sys.executable, "run.py"] try: - subprocess.check_call( + subprocess.check_output( call, cwd=str(dst_path), env=os.environ.copy(), + stderr=subprocess.STDOUT, ) - except Exception as e: + except subprocess.CalledProcessError as e: raise RuntimeError( - f"Failed to run project generated with parameters: {answers}" + f"Failed to run project generated with parameters: {answers}\n" + f"{e.output.decode()}" ) from e # check the pipeline run is successful