From 6012633f9ea430d5fc422ea10485568b44109597 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Mon, 2 Oct 2023 08:53:01 +0200 Subject: [PATCH 01/17] fix ExternalArtifact behavior --- template/pipelines/batch_inference.py | 2 +- template/pipelines/training.py | 2 +- template/run.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/template/pipelines/batch_inference.py b/template/pipelines/batch_inference.py index b19c3a7..8f9c26c 100644 --- a/template/pipelines/batch_inference.py +++ b/template/pipelines/batch_inference.py @@ -20,7 +20,7 @@ mlflow_model_registry_deployer_step, ) from zenml.logger import get_logger -from zenml.steps.external_artifact import ExternalArtifact +from zenml.artifacts.external_artifact import ExternalArtifact logger = get_logger(__name__) diff --git a/template/pipelines/training.py b/template/pipelines/training.py index 8939914..d80083d 100644 --- a/template/pipelines/training.py +++ b/template/pipelines/training.py @@ -31,7 +31,7 @@ ) from zenml.integrations.mlflow.steps.mlflow_registry import mlflow_register_model_step from zenml.logger import get_logger -from zenml.steps.external_artifact import ExternalArtifact +from zenml.artifacts.external_artifact import ExternalArtifact logger = get_logger(__name__) diff --git a/template/run.py b/template/run.py index 562fb91..49fc089 100644 --- a/template/run.py +++ b/template/run.py @@ -1,7 +1,7 @@ # {% include 'template/license_header' %} -from zenml.steps.external_artifact import ExternalArtifact +from zenml.artifacts.external_artifact import ExternalArtifact from zenml.logger import get_logger from pipelines import {{product_name}}_batch_inference, {{product_name}}_training from config import MetaConfig @@ -177,7 +177,7 @@ def main( logger.info( "Batch inference pipeline finished successfully! " "You can find predictions in Artifact Store using ID: " - f"`{str(artifact.upload_if_necessary())}`." + f"`{str(artifact.id)}`." ) From 6185729058589fea199ed7dc3d624f12de3a846f Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Mon, 2 Oct 2023 08:54:24 +0200 Subject: [PATCH 02/17] update zenml-ref --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0a81ca4..7de3135 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,3 +35,4 @@ jobs: with: stack-name: ${{ matrix.stack-name }} python-version: ${{ matrix.python-version }} + zenml-ref: feature/OSS-2424-extend-external-artifact From 8ae5d7bf736c8babc22aef6976bd6300adb0bebc Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Mon, 2 Oct 2023 08:57:57 +0200 Subject: [PATCH 03/17] fix ref name --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7de3135..d7e512c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,4 +35,4 @@ jobs: with: stack-name: ${{ matrix.stack-name }} python-version: ${{ matrix.python-version }} - zenml-ref: feature/OSS-2424-extend-external-artifact + ref-zenml: feature/OSS-2424-extend-external-artifact From e664297b52ffae8e093410ce353ed218d34fa7d1 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Wed, 11 Oct 2023 11:54:34 +0200 Subject: [PATCH 04/17] YAML based template configuration --- .github/workflows/ci.yml | 2 +- template/artifacts/__init__.py | 1 - template/artifacts/materializer.py | 89 ------------------- template/artifacts/model_metadata.py | 62 ------------- template/config.py | 87 ------------------ template/config.yaml | 78 ++++++++++++++++ template/pipelines/batch_inference.py | 15 ++-- template/pipelines/training.py | 44 +++++---- template/run.py | 24 +++-- .../inference_get_current_version.py | 12 +-- .../steps/promotion/promote_get_versions.py | 11 +-- ...mote_metric_compare_promoter.py{% endif %} | 12 +-- ...e_promotion %}promote_latest.py{% endif %} | 15 ++-- template/steps/training/model_trainer.py | 12 +-- .../hp_tuning_select_best_model.py | 21 ++--- .../hp_tuning_single_search.py | 41 +++++---- template/utils/get_model_from_config.py | 17 ++++ 17 files changed, 205 insertions(+), 338 deletions(-) delete mode 100644 template/artifacts/__init__.py delete mode 100644 template/artifacts/materializer.py delete mode 100644 template/artifacts/model_metadata.py delete mode 100644 template/config.py create mode 100644 template/config.yaml create mode 100644 template/utils/get_model_from_config.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d7e512c..9cb8ec5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,4 +35,4 @@ jobs: with: stack-name: ${{ matrix.stack-name }} python-version: ${{ matrix.python-version }} - ref-zenml: feature/OSS-2424-extend-external-artifact + ref-zenml: feature/OSS-2494-evaluate-config-early diff --git a/template/artifacts/__init__.py b/template/artifacts/__init__.py deleted file mode 100644 index 4bc11e5..0000000 --- a/template/artifacts/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# {% include 'template/license_header' %} diff --git a/template/artifacts/materializer.py b/template/artifacts/materializer.py deleted file mode 100644 index 40c89a3..0000000 --- a/template/artifacts/materializer.py +++ /dev/null @@ -1,89 +0,0 @@ -# {% include 'template/license_header' %} - - -import json -import os -from typing import Type - -from zenml.enums import ArtifactType -from zenml.io import fileio -from zenml.materializers.base_materializer import BaseMaterializer - -from artifacts.model_metadata import ModelMetadata - - -class ModelMetadataMaterializer(BaseMaterializer): - ASSOCIATED_TYPES = (ModelMetadata,) - ASSOCIATED_ARTIFACT_TYPE = ArtifactType.STATISTICS - - def load(self, data_type: Type[ModelMetadata]) -> ModelMetadata: - """Read from artifact store. - - Args: - data_type: What type the artifact data should be loaded as. - - Raises: - ValueError: on deserialization issue - - Returns: - Read artifact. - """ - super().load(data_type) - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - import sklearn.ensemble - import sklearn.linear_model - import sklearn.tree - - modules = [sklearn.ensemble, sklearn.linear_model, sklearn.tree] - - with fileio.open(os.path.join(self.uri, "data.json"), "r") as f: - data_json = json.loads(f.read()) - class_name = data_json["model_class"] - cls = None - for module in modules: - if cls := getattr(module, class_name, None): - break - if cls is None: - raise ValueError( - f"Cannot deserialize `{class_name}` using {self.__class__.__name__}. " - f"Only classes from modules {[m.__name__ for m in modules]} " - "are supported" - ) - data = ModelMetadata(cls) - if "search_grid" in data_json: - data.search_grid = data_json["search_grid"] - if "params" in data_json: - data.params = data_json["params"] - if "metric" in data_json: - data.metric = data_json["metric"] - ### YOUR CODE ENDS HERE ### - - return data - - def save(self, data: ModelMetadata) -> None: - """Write to artifact store. - - Args: - data: The data of the artifact to save. - """ - super().save(data) - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - # Dump the model metadata directly into the artifact store as a JSON file - data_json = dict() - with fileio.open(os.path.join(self.uri, "data.json"), "w") as f: - data_json["model_class"] = data.model_class.__name__ - if data.search_grid: - data_json["search_grid"] = {} - for k, v in data.search_grid.items(): - if type(v) == range: - data_json["search_grid"][k] = list(v) - else: - data_json["search_grid"][k] = v - if data.params: - data_json["params"] = data.params - if data.metric: - data_json["metric"] = data.metric - f.write(json.dumps(data_json)) - ### YOUR CODE ENDS HERE ### diff --git a/template/artifacts/model_metadata.py b/template/artifacts/model_metadata.py deleted file mode 100644 index d3e6ea6..0000000 --- a/template/artifacts/model_metadata.py +++ /dev/null @@ -1,62 +0,0 @@ -# {% include 'template/license_header' %} - - -from typing import Any, Dict - -from sklearn.base import ClassifierMixin - - -class ModelMetadata: - """A custom artifact that stores model metadata. - - A model metadata object gathers together information that is collected - about the model being trained in a training pipeline run. This data type - is used for one of the artifacts returned by the model evaluation step. - - This is an example of a *custom artifact data type*: a type returned by - one of the pipeline steps that isn't natively supported by the ZenML - framework. Custom artifact data types are a common occurrence in ZenML, - usually encountered in one of the following circumstances: - - - you use a third party library that is not covered as a ZenML integration - and you model one or more step artifacts from the data types provided by - this library (e.g. datasets, models, data validation profiles, model - evaluation results/reports etc.) - - you need to use one of your own data types as a step artifact and it is - not one of the basic Python artifact data types supported by the ZenML - framework (e.g. str, int, float, dictionaries, lists, etc.) - - you want to extend one of the artifact data types already natively - supported by ZenML (e.g. pandas.DataFrame or sklearn.ClassifierMixin) - to customize it with your own data and/or behavior. - - In all above cases, the ZenML framework lacks one very important piece of - information: it doesn't "know" how to convert the data into a format that - can be saved in the artifact store (e.g. on a filesystem or persistent - storage service like S3 or GCS). Saving and loading artifacts from the - artifact store is something called "materialization" in ZenML terms and - you need to provide this missing information in the form of a custom - materializer - a class that implements loading/saving artifacts from/to - the artifact store. Take a look at the `materializers` folder to see how a - custom materializer is implemented for this artifact data type. - - More information about custom step artifact data types and ZenML - materializers is available in the docs: - - https://docs.zenml.io/user-guide/advanced-guide/artifact-management/handle-custom-data-types - - """ - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - def __init__( - self, - model_class: ClassifierMixin, - search_grid: Dict[str, Any] = None, - params: Dict[str, Any] = None, - metric: float = None, - ) -> None: - self.model_class = model_class - self.search_grid = search_grid - self.params = params - self.metric = metric - - ### YOUR CODE ENDS HERE ### diff --git a/template/config.py b/template/config.py deleted file mode 100644 index f08b7fc..0000000 --- a/template/config.py +++ /dev/null @@ -1,87 +0,0 @@ -# {% include 'template/license_header' %} - - -from artifacts.model_metadata import ModelMetadata -from pydantic import BaseConfig -from sklearn.ensemble import RandomForestClassifier -from sklearn.tree import DecisionTreeClassifier -from zenml.config import DockerSettings -from zenml.integrations.constants import ( - AWS, -{%- if data_quality_checks %} - EVIDENTLY, -{%- endif %} - KUBEFLOW, - KUBERNETES, - MLFLOW, - SKLEARN, - SLACK, -) -from zenml.model_registries.base_model_registry import ModelVersionStage - -PIPELINE_SETTINGS = dict( - docker=DockerSettings( - required_integrations=[ - AWS, -{%- if data_quality_checks %} - EVIDENTLY, -{%- endif %} - KUBEFLOW, - KUBERNETES, - MLFLOW, - SKLEARN, - SLACK, - ], - ) -) - -DEFAULT_PIPELINE_EXTRAS = dict( - notify_on_success={{notify_on_successes}}, - notify_on_failure={{notify_on_failures}} -) - - -class MetaConfig(BaseConfig): - pipeline_name_training = "{{product_name}}_training" - pipeline_name_batch_inference = "{{product_name}}_batch_inference" - mlflow_model_name = "{{product_name}}_model" -{%- if target_environment == 'production' %} - target_env = ModelVersionStage.PRODUCTION -{%- else %} - target_env = ModelVersionStage.STAGING -{%- endif %} - - ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### -{%- if hyperparameters_tuning %} - # This set contains all the models that you want to evaluate - # during hyperparameter tuning stage. - model_search_space = { - ModelMetadata( - RandomForestClassifier, - search_grid=dict( - criterion=["gini", "entropy"], - max_depth=[2, 4, 6, 8, 10, 12], - min_samples_leaf=range(1, 10), - n_estimators=range(50, 500, 25), - ), - ), - ModelMetadata( - DecisionTreeClassifier, - search_grid=dict( - criterion=["gini", "entropy"], - max_depth=[2, 4, 6, 8, 10, 12], - min_samples_leaf=range(1, 10), - ), - ), - } -{%- else %} - # This model configuration will be used for the training stage. - model_configuration = ModelMetadata( - DecisionTreeClassifier, - params=dict( - criterion="gini", - max_depth=5, - min_samples_leaf=3, - ), - ) -{%- endif %} diff --git a/template/config.yaml b/template/config.yaml new file mode 100644 index 0000000..030be20 --- /dev/null +++ b/template/config.yaml @@ -0,0 +1,78 @@ +# {% include 'template/license_header' %} + +settings: + docker: + requirements: + - aws +{%- if data_quality_checks %} + - evidently +{%- endif %} + - kubeflow + - kubernetes + - mlflow + - sklearn + - slack +extra: + mlflow_model_name: e2e_use_case_model +{%- if target_environment == 'production' %} + target_env: Production +{%- else %} + target_env: Staging +{%- endif %} + notify_on_success: False + notify_on_failure: True +{%- if hyperparameters_tuning %} + # This set contains all the models that you want to evaluate + # during hyperparameter tuning stage. + model_search_space: + random_forest: + model_package: sklearn.ensemble + model_class: RandomForestClassifier + search_grid: + criterion: + - gini + - entropy + max_depth: + - 2 + - 4 + - 6 + - 8 + - 10 + - 12 + min_samples_leaf: + range: + start: 1 + end: 10 + n_estimators: + range: + start: 50 + end: 500 + step: 25 + decision_tree: + model_package: sklearn.tree + model_class: DecisionTreeClassifier + search_grid: + criterion: + - gini + - entropy + max_depth: + - 2 + - 4 + - 6 + - 8 + - 10 + - 12 + min_samples_leaf: + range: + start: 1 + end: 10 +{%- else %} + # This model configuration will be used for the training stage. + model_configuration: + model_package: sklearn.tree + model_class: DecisionTreeClassifier + params: + criterion: gini + max_depth: 5 + min_samples_leaf: 3 +{%- endif %} \ No newline at end of file diff --git a/template/pipelines/batch_inference.py b/template/pipelines/batch_inference.py index 8f9c26c..127cfb2 100644 --- a/template/pipelines/batch_inference.py +++ b/template/pipelines/batch_inference.py @@ -1,7 +1,6 @@ # {% include 'template/license_header' %} -from config import DEFAULT_PIPELINE_EXTRAS, PIPELINE_SETTINGS, MetaConfig from steps import ( data_loader, {%- if data_quality_checks %} @@ -13,7 +12,7 @@ notify_on_failure, notify_on_success, ) -from zenml import pipeline +from zenml import get_pipeline_context, pipeline from zenml.integrations.evidently.metrics import EvidentlyMetricConfig from zenml.integrations.evidently.steps import evidently_report_step from zenml.integrations.mlflow.steps.mlflow_deployer import ( @@ -25,11 +24,7 @@ logger = get_logger(__name__) -@pipeline( - settings=PIPELINE_SETTINGS, - on_failure=notify_on_failure, - extra=DEFAULT_PIPELINE_EXTRAS, -) +@pipeline(on_failure=notify_on_failure) def {{product_name}}_batch_inference(): """ Model batch inference pipeline. @@ -45,7 +40,7 @@ def {{product_name}}_batch_inference(): df_inference = inference_data_preprocessor( dataset_inf=df_inference, preprocess_pipeline=ExternalArtifact( - pipeline_name=MetaConfig.pipeline_name_training, + pipeline_name="{{product_name}}_training", artifact_name="preprocess_pipeline", ), target=target, @@ -55,7 +50,7 @@ def {{product_name}}_batch_inference(): ########## DataQuality stage ########## report, _ = evidently_report_step( reference_dataset=ExternalArtifact( - pipeline_name=MetaConfig.pipeline_name_training, + pipeline_name="{{product_name}}_training", artifact_name="dataset_trn", ), comparison_dataset=df_inference, @@ -69,7 +64,7 @@ def {{product_name}}_batch_inference(): ########## Inference stage ########## registry_model_version = inference_get_current_version() deployment_service = mlflow_model_registry_deployer_step( - registry_model_name=MetaConfig.mlflow_model_name, + registry_model_name=get_pipeline_context().extra["mlflow_model_name"], registry_model_version=registry_model_version, replace_existing=False, ) diff --git a/template/pipelines/training.py b/template/pipelines/training.py index d80083d..67e7cd0 100644 --- a/template/pipelines/training.py +++ b/template/pipelines/training.py @@ -3,8 +3,6 @@ from typing import List, Optional -from artifacts.materializer import ModelMetadataMaterializer -from config import DEFAULT_PIPELINE_EXTRAS, PIPELINE_SETTINGS, MetaConfig from steps import ( data_loader, {%- if hyperparameters_tuning %} @@ -25,23 +23,24 @@ train_data_preprocessor, train_data_splitter, ) -from zenml import pipeline +from zenml import pipeline, get_pipeline_context from zenml.integrations.mlflow.steps.mlflow_deployer import ( mlflow_model_registry_deployer_step, ) from zenml.integrations.mlflow.steps.mlflow_registry import mlflow_register_model_step from zenml.logger import get_logger +{%- if hyperparameters_tuning %} + +{%- else %} from zenml.artifacts.external_artifact import ExternalArtifact +from utils.get_model_from_config import get_model_from_config +{%- endif %} logger = get_logger(__name__) -@pipeline( - settings=PIPELINE_SETTINGS, - on_failure=notify_on_failure, - extra=DEFAULT_PIPELINE_EXTRAS, -) +@pipeline(on_failure=notify_on_failure) def {{product_name}}_training( test_size: float = 0.2, drop_na: Optional[bool] = None, @@ -74,6 +73,7 @@ def {{product_name}}_training( ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### # Link all the steps together by calling them and passing the output # of one step as the input of the next step. + pipeline_extra = get_pipeline_context().extra ########## ETL stage ########## raw_data, target = data_loader() dataset_trn, dataset_tst = train_data_splitter( @@ -92,30 +92,36 @@ def {{product_name}}_training( ########## Hyperparameter tuning stage ########## after = [] search_steps_prefix = "hp_tuning_search_" - for i, model_search_configuration in enumerate(MetaConfig.model_search_space): - step_name = f"{search_steps_prefix}{i}" + for config_name,model_search_configuration in pipeline_extra["model_search_space"].items(): + step_name = f"{search_steps_prefix}{config_name}" hp_tuning_single_search( - model_metadata=ExternalArtifact( - value=model_search_configuration, - ), id=step_name, + model_package = model_search_configuration["model_package"], + model_class = model_search_configuration["model_class"], + search_grid = model_search_configuration["search_grid"], dataset_trn=dataset_trn, dataset_tst=dataset_tst, target=target, ) after.append(step_name) - best_model_config = hp_tuning_select_best_model( + best_model = hp_tuning_select_best_model( search_steps_prefix=search_steps_prefix, after=after ) +{%- else %} + model_configuration = pipeline_extra["model_configuration"] + best_model = get_model_from_config( + model_package=model_configuration["model_package"], + model_class=model_configuration["model_class"], + )(**model_configuration["params"]) {%- endif %} ########## Training stage ########## model = model_trainer( dataset_trn=dataset_trn, {%- if hyperparameters_tuning %} - model_config=best_model_config, + model=best_model, {%- else %} - model_config=ExternalArtifact(value=MetaConfig.model_configuration, materializer=ModelMetadataMaterializer), + model=ExternalArtifact(value=best_model), {%- endif %} random_seed=random_seed, target=target, @@ -131,7 +137,7 @@ def {{product_name}}_training( ) mlflow_register_model_step( model, - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], ) ########## Promotion stage ########## @@ -141,7 +147,7 @@ def {{product_name}}_training( {%- if metric_compare_promotion %} latest_deployment = mlflow_model_registry_deployer_step( id="deploy_latest_model_version", - registry_model_name=MetaConfig.mlflow_model_name, + registry_model_name=pipeline_extra["mlflow_model_name"], registry_model_version=latest_version, replace_existing=False, ) @@ -153,7 +159,7 @@ def {{product_name}}_training( current_deployment = mlflow_model_registry_deployer_step( id="deploy_current_model_version", - registry_model_name=MetaConfig.mlflow_model_name, + registry_model_name=pipeline_extra["mlflow_model_name"], registry_model_version=current_version, replace_existing=False, after=["get_metrics_latest_model_version"], diff --git a/template/run.py b/template/run.py index 49fc089..4be9b6d 100644 --- a/template/run.py +++ b/template/run.py @@ -1,13 +1,14 @@ # {% include 'template/license_header' %} +import click +from datetime import datetime as dt +import os +from typing import Optional from zenml.artifacts.external_artifact import ExternalArtifact from zenml.logger import get_logger + from pipelines import {{product_name}}_batch_inference, {{product_name}}_training -from config import MetaConfig -import click -from typing import Optional -from datetime import datetime as dt logger = get_logger(__name__) @@ -139,7 +140,12 @@ def main( # Run a pipeline with the required parameters. This executes # all steps in the pipeline in the correct order using the orchestrator # stack component that is configured in your active ZenML stack. - pipeline_args = {} + pipeline_args = { + "config_path":os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "config.yaml", + ) + } if no_cache: pipeline_args["enable_cache"] = False @@ -159,7 +165,7 @@ def main( pipeline_args[ "run_name" - ] = f"{MetaConfig.pipeline_name_training}_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + ] = f"{{product_name}}_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" {{product_name}}_training.with_options(**pipeline_args)(**run_args_train) logger.info("Training pipeline finished successfully!") @@ -167,17 +173,17 @@ def main( run_args_inference = {} pipeline_args[ "run_name" - ] = f"{MetaConfig.pipeline_name_batch_inference}_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + ] = f"{{product_name}}_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" {{product_name}}_batch_inference.with_options(**pipeline_args)(**run_args_inference) artifact = ExternalArtifact( - pipeline_name=MetaConfig.pipeline_name_batch_inference, + pipeline_name="{{product_name}}_batch_inference", artifact_name="predictions", ) logger.info( "Batch inference pipeline finished successfully! " "You can find predictions in Artifact Store using ID: " - f"`{str(artifact.id)}`." + f"`{str(artifact.get_artifact_id())}`." ) diff --git a/template/steps/inference/inference_get_current_version.py b/template/steps/inference/inference_get_current_version.py index 0454d87..931f7b4 100644 --- a/template/steps/inference/inference_get_current_version.py +++ b/template/steps/inference/inference_get_current_version.py @@ -3,10 +3,10 @@ from typing_extensions import Annotated -from config import MetaConfig -from zenml import step +from zenml import step, get_step_context from zenml.client import Client from zenml.logger import get_logger +from zenml.model_registries.base_model_registry import ModelVersionStage logger = get_logger(__name__) @@ -22,13 +22,13 @@ def inference_get_current_version() -> Annotated[str, "model_version"]: """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### - + pipeline_extra = get_step_context().pipeline_run.config.extra current_version = model_registry.list_model_versions( - name=MetaConfig.mlflow_model_name, - stage=MetaConfig.target_env, + name=pipeline_extra["mlflow_model_name"], + stage=ModelVersionStage(pipeline_extra["target_env"]), )[0].version logger.info( - f"Current model version in `{MetaConfig.target_env.value}` is `{current_version}`" + f"Current model version in `{pipeline_extra['target_env']}` is `{current_version}`" ) return current_version diff --git a/template/steps/promotion/promote_get_versions.py b/template/steps/promotion/promote_get_versions.py index 52d3529..bb5ff3c 100644 --- a/template/steps/promotion/promote_get_versions.py +++ b/template/steps/promotion/promote_get_versions.py @@ -4,10 +4,10 @@ from typing import Tuple from typing_extensions import Annotated -from config import MetaConfig -from zenml import step +from zenml import get_step_context, step from zenml.client import Client from zenml.logger import get_logger +from zenml.model_registries.base_model_registry import ModelVersionStage logger = get_logger(__name__) @@ -30,16 +30,17 @@ def promote_get_versions() -> ( """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + pipeline_extra = get_step_context().pipeline_run.config.extra none_versions = model_registry.list_model_versions( - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], stage=None, ) latest_versions = none_versions[0].version logger.info(f"Latest model version is {latest_versions}") target_versions = model_registry.list_model_versions( - name=MetaConfig.mlflow_model_name, - stage=MetaConfig.target_env, + name=pipeline_extra["mlflow_model_name"], + stage=ModelVersionStage(pipeline_extra["target_env"]), ) current_version = latest_versions if target_versions: diff --git a/template/steps/promotion/{% if metric_compare_promotion %}promote_metric_compare_promoter.py{% endif %} b/template/steps/promotion/{% if metric_compare_promotion %}promote_metric_compare_promoter.py{% endif %} index 045f1f7..d055885 100644 --- a/template/steps/promotion/{% if metric_compare_promotion %}promote_metric_compare_promoter.py{% endif %} +++ b/template/steps/promotion/{% if metric_compare_promotion %}promote_metric_compare_promoter.py{% endif %} @@ -1,8 +1,7 @@ # {% include 'template/license_header' %} -from config import MetaConfig -from zenml import step +from zenml import get_step_context, step from zenml.client import Client from zenml.logger import get_logger from zenml.model_registries.base_model_registry import ModelVersionStage @@ -46,6 +45,7 @@ def promote_metric_compare_promoter( """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + pipeline_extra = get_step_context().pipeline_run.config.extra should_promote = True if latest_version == current_version: @@ -69,18 +69,18 @@ def promote_metric_compare_promoter( if should_promote: if latest_version != current_version: model_registry.update_model_version( - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], version=current_version, stage=ModelVersionStage.ARCHIVED, ) model_registry.update_model_version( - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], version=latest_version, - stage=MetaConfig.target_env, + stage=ModelVersionStage(pipeline_extra["target_env"]), ) promoted_version = latest_version logger.info( - f"Current model version in `{MetaConfig.target_env.value}` is `{promoted_version}`" + f"Current model version in `{pipeline_extra['target_env']}` is `{promoted_version}`" ) ### YOUR CODE ENDS HERE ### diff --git a/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest.py{% endif %} b/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest.py{% endif %} index 6b940fe..9ec05bf 100644 --- a/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest.py{% endif %} +++ b/template/steps/promotion/{% if not metric_compare_promotion %}promote_latest.py{% endif %} @@ -1,13 +1,11 @@ # {% include 'template/license_header' %} -from zenml import step +from zenml import get_step_context, step from zenml.client import Client from zenml.logger import get_logger from zenml.model_registries.base_model_registry import ModelVersionStage -from config import MetaConfig - logger = get_logger(__name__) model_registry = Client().active_stack.model_registry @@ -28,22 +26,23 @@ def promote_latest(latest_version:str, current_version:str): ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### logger.info(f"Promoting latest model version `{latest_version}`") + pipeline_extra = get_step_context().pipeline_run.config.extra if latest_version != current_version: model_registry.update_model_version( - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], version=current_version, - stage=ModelVersionStage.ARCHIVED, + stage=ModelVersionStage(ModelVersionStage.ARCHIVED), metadata={}, ) model_registry.update_model_version( - name=MetaConfig.mlflow_model_name, + name=pipeline_extra["mlflow_model_name"], version=latest_version, - stage=MetaConfig.target_env, + stage=ModelVersionStage(pipeline_extra["target_env"]), metadata={}, ) promoted_version = latest_version logger.info( - f"Current model version in `{MetaConfig.target_env.value}` is `{promoted_version}`" + f"Current model version in `{pipeline_extra['target_env']}` is `{promoted_version}`" ) ### YOUR CODE ENDS HERE ### diff --git a/template/steps/training/model_trainer.py b/template/steps/training/model_trainer.py index b651fde..43eff16 100644 --- a/template/steps/training/model_trainer.py +++ b/template/steps/training/model_trainer.py @@ -5,7 +5,6 @@ import mlflow import pandas as pd -from artifacts.model_metadata import ModelMetadata from sklearn.base import ClassifierMixin from zenml import step from zenml.client import Client @@ -28,7 +27,7 @@ @step(experiment_tracker=experiment_tracker.name) def model_trainer( dataset_trn: pd.DataFrame, - model_config: ModelMetadata, + model: ClassifierMixin, target: str, random_seed: int = 42, ) -> Annotated[ClassifierMixin, "model"]: @@ -55,7 +54,7 @@ def model_trainer( Args: dataset_trn: The preprocessed train dataset. - model_config: `ModelMetadata` to train on + model: The model instance to train. target: Name of target columns in dataset. random_seed: Fixed seed of random generator. @@ -66,13 +65,6 @@ def model_trainer( ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### # Initialize the model with the hyperparameters indicated in the step # parameters and train it on the training set. - hyperparameters = model_config.params - model_class = model_config.model_class - if "random_seed" in model_class.__init__.__code__.co_varnames: - model = model_class(random_seed=random_seed, **hyperparameters) - else: - model = model_class(**hyperparameters) - logger.info(f"Training model {model}...") mlflow.sklearn.autolog() model.fit( diff --git a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py index 7af3fd0..0f0bcc3 100644 --- a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py +++ b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_select_best_model.py @@ -3,8 +3,8 @@ from typing_extensions import Annotated -from artifacts.materializer import ModelMetadataMaterializer -from artifacts.model_metadata import ModelMetadata +from sklearn.base import ClassifierMixin + from zenml import get_step_context, step from zenml.client import Client from zenml.logger import get_logger @@ -12,10 +12,10 @@ logger = get_logger(__name__) -@step(output_materializers=ModelMetadataMaterializer) +@step def hp_tuning_select_best_model( search_steps_prefix: str, -) -> Annotated[ModelMetadata, "best_model"]: +) -> Annotated[ClassifierMixin, "best_model"]: """Find best model across all HP tuning attempts. This is an example of a model hyperparameter tuning step that takes @@ -33,12 +33,13 @@ def hp_tuning_select_best_model( run = Client().get_pipeline_run(run_name) best_model = None + best_metric = -1 for run_step_name, run_step in run.steps.items(): if run_step_name.startswith(search_steps_prefix): - for output_name, output in run_step.outputs.items(): - if output_name == "best_model": - model: ModelMetadata = output.load() - if best_model is None or best_model.metric < model.metric: - best_model = model + if "best_model" in run_step.outputs: + model: ClassifierMixin = run_step.outputs["best_model"].load() + metric: float = run_step.outputs["metric"].load() + if best_model is None or best_metric < metric: + best_model = model ### YOUR CODE ENDS HERE ### - return (best_model or ModelMetadata(None)) # for types compatibility + return best_model diff --git a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py index 10ade38..e246d4e 100644 --- a/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py +++ b/template/steps/{% if hyperparameters_tuning %}hp_tuning{% endif %}/hp_tuning_single_search.py @@ -1,26 +1,30 @@ # {% include 'template/license_header' %} +from typing import Any, Dict, Tuple from typing_extensions import Annotated import pandas as pd -from artifacts.materializer import ModelMetadataMaterializer -from artifacts.model_metadata import ModelMetadata +from sklearn.base import ClassifierMixin from sklearn.metrics import accuracy_score from sklearn.model_selection import RandomizedSearchCV from zenml import step from zenml.logger import get_logger +from utils.get_model_from_config import get_model_from_config + logger = get_logger(__name__) -@step(output_materializers=ModelMetadataMaterializer) +@step def hp_tuning_single_search( - model_metadata: ModelMetadata, + model_package: str, + model_class: str, + search_grid: Dict[str, Any], dataset_trn: pd.DataFrame, dataset_tst: pd.DataFrame, target: str, -) -> Annotated[ModelMetadata, "best_model"]: +) -> Tuple[Annotated[ClassifierMixin, "best_model"],Annotated[float,"metric"]]: """Evaluate a trained model. This is an example of a model hyperparameter tuning step that takes @@ -35,7 +39,9 @@ def hp_tuning_single_search( https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines Args: - model_metadata: `ModelMetadata` to search + model_package: The package containing the model to use for hyperparameter tuning. + model_class: The class of the model to use for hyperparameter tuning. + search_grid: The hyperparameter search space. dataset_trn: The train dataset. dataset_tst: The test dataset. target: Name of target columns in dataset. @@ -43,6 +49,16 @@ def hp_tuning_single_search( Returns: The best possible model parameters for given config. """ + model_class = get_model_from_config(model_package, model_class) + + for search_key in search_grid: + if "range" in search_grid[search_key]: + search_grid[search_key] = range( + search_grid[search_key]["range"]["start"], + search_grid[search_key]["range"]["end"], + search_grid[search_key]["range"].get("step",1) + ) + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### X_trn = dataset_trn.drop(columns=[target]) @@ -50,23 +66,18 @@ def hp_tuning_single_search( X_tst = dataset_tst.drop(columns=[target]) y_tst = dataset_tst[target] logger.info("Running Hyperparameter tuning...") - best_model = {"class": None, "params": None, "metric": -1} cv = RandomizedSearchCV( - estimator=model_metadata.model_class(), - param_distributions=model_metadata.search_grid, + estimator=model_class(), + param_distributions=search_grid, cv=3, n_jobs=-1, n_iter=10, random_state=42, scoring="accuracy", + refit=True, ) cv.fit(X=X_trn, y=y_trn) y_pred = cv.predict(X_tst) score = accuracy_score(y_tst, y_pred) - best_model = ModelMetadata( - model_metadata.model_class, - params=cv.best_params_, - metric=score, - ) ### YOUR CODE ENDS HERE ### - return best_model + return cv.best_estimator_, score diff --git a/template/utils/get_model_from_config.py b/template/utils/get_model_from_config.py new file mode 100644 index 0000000..72444d8 --- /dev/null +++ b/template/utils/get_model_from_config.py @@ -0,0 +1,17 @@ +# {% include 'template/license_header' %} + + +from sklearn.base import ClassifierMixin + + +def get_model_from_config(model_package: str, model_class: str) -> ClassifierMixin: + if model_package == "sklearn.ensemble": + import sklearn.ensemble as package + model_class = getattr(package, model_class) + elif model_package == "sklearn.tree": + import sklearn.tree as package + model_class = getattr(package, model_class) + else: + raise ValueError(f"Unsupported model package: {model_package}") + + return model_class \ No newline at end of file From 074f0bce9d81396640eb22944796bb8ecd57757b Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Wed, 11 Oct 2023 14:37:11 +0200 Subject: [PATCH 05/17] update docs --- .github/pull_request_template.md | 16 ++++++++++++++++ README.md | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 .github/pull_request_template.md diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..ae71ac2 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,16 @@ +## Describe changes +I implemented/fixed _ to achieve _. + +## Pre-requisites +Please ensure you have done the following: +- [ ] If my change requires a change to docs, I have updated the documentation accordingly. +- [ ] I have added tests to cover my changes. +- [ ] I have updated `ref-zenml` in `.github/workflows/ci.yml` accordingly (if you don't know - `main` would be a solid choice) + +## Types of changes + +- [ ] Bug fix (non-breaking change which fixes an issue) +- [ ] New feature (non-breaking change which adds functionality) +- [ ] Breaking change (fix or feature that would cause existing functionality to change) +- [ ] Other (add details above) + diff --git a/README.md b/README.md index 1f7b41c..3f7c7f3 100644 --- a/README.md +++ b/README.md @@ -209,7 +209,7 @@ df_inference, target = data_loader(is_inference=True) df_inference = inference_data_preprocessor( dataset_inf=df_inference, preprocess_pipeline=ExternalArtifact( - pipeline_name=MetaConfig.pipeline_name_training, + pipeline_name="your_product_name_training", artifact_name="preprocess_pipeline", ), target=target, From cc48de225ff046dde6e94c610468925efdbff794 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 12 Oct 2023 08:15:12 +0200 Subject: [PATCH 06/17] fix config --- template/config.yaml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/template/config.yaml b/template/config.yaml index 030be20..bd4c34c 100644 --- a/template/config.yaml +++ b/template/config.yaml @@ -68,11 +68,11 @@ extra: end: 10 {%- else %} # This model configuration will be used for the training stage. - model_configuration: - model_package: sklearn.tree - model_class: DecisionTreeClassifier - params: - criterion: gini - max_depth: 5 - min_samples_leaf: 3 + model_configuration: + model_package: sklearn.tree + model_class: DecisionTreeClassifier + params: + criterion: gini + max_depth: 5 + min_samples_leaf: 3 {%- endif %} \ No newline at end of file From 6fe8ecb32255efe7756b673bb37355cea69619ce Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 12 Oct 2023 10:48:46 +0200 Subject: [PATCH 07/17] verbose tests output --- tests/test_template.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_template.py b/tests/test_template.py index 7f43d43..19dccc7 100644 --- a/tests/test_template.py +++ b/tests/test_template.py @@ -83,14 +83,15 @@ def generate_and_run_project( call = [sys.executable, "run.py"] try: - subprocess.check_call( + output = subprocess.check_output( call, cwd=str(dst_path), env=os.environ.copy(), ) except Exception as e: raise RuntimeError( - f"Failed to run project generated with parameters: {answers}" + f"Failed to run project generated with parameters: {answers}\n" + f"{output}" ) from e # check the pipeline run is successful From c6f1eab8d032cfcf0e93351a160ca8aeab3ec04a Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 12 Oct 2023 10:52:16 +0200 Subject: [PATCH 08/17] verbose tests output --- tests/test_template.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_template.py b/tests/test_template.py index 19dccc7..fd7c6ac 100644 --- a/tests/test_template.py +++ b/tests/test_template.py @@ -87,6 +87,7 @@ def generate_and_run_project( call, cwd=str(dst_path), env=os.environ.copy(), + stderr=subprocess.STDOUT, ) except Exception as e: raise RuntimeError( From 051470d272e8d7169c59ad799038d1f56b844e13 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 12 Oct 2023 10:56:34 +0200 Subject: [PATCH 09/17] no fail fast --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9cb8ec5..82417f6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,6 +18,7 @@ jobs: run-tests: runs-on: ${{ matrix.os }} strategy: + fail-fast: false matrix: stack-name: [local] os: [windows-latest, ubuntu-latest, macos-latest] From 1377a8dd4792a7f910127955016510eb3ade0d08 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 12 Oct 2023 11:18:55 +0200 Subject: [PATCH 10/17] verbose tests output --- tests/test_template.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_template.py b/tests/test_template.py index fd7c6ac..cb8dbd7 100644 --- a/tests/test_template.py +++ b/tests/test_template.py @@ -83,16 +83,16 @@ def generate_and_run_project( call = [sys.executable, "run.py"] try: - output = subprocess.check_output( + subprocess.check_output( call, cwd=str(dst_path), env=os.environ.copy(), stderr=subprocess.STDOUT, ) - except Exception as e: + except subprocess.CalledProcessError as e: raise RuntimeError( f"Failed to run project generated with parameters: {answers}\n" - f"{output}" + f"{e.output}" ) from e # check the pipeline run is successful From 23f9358f4439e0f5bf2cf4d91eefda76b656f44e Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 12 Oct 2023 11:53:06 +0200 Subject: [PATCH 11/17] bump timeouts for mlflow --- template/pipelines/batch_inference.py | 1 + template/pipelines/training.py | 2 ++ tests/test_template.py | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/template/pipelines/batch_inference.py b/template/pipelines/batch_inference.py index 127cfb2..8d54fee 100644 --- a/template/pipelines/batch_inference.py +++ b/template/pipelines/batch_inference.py @@ -67,6 +67,7 @@ def {{product_name}}_batch_inference(): registry_model_name=get_pipeline_context().extra["mlflow_model_name"], registry_model_version=registry_model_version, replace_existing=False, + timeout=180, ) inference_predict( deployment_service=deployment_service, diff --git a/template/pipelines/training.py b/template/pipelines/training.py index 67e7cd0..8150b65 100644 --- a/template/pipelines/training.py +++ b/template/pipelines/training.py @@ -150,6 +150,7 @@ def {{product_name}}_training( registry_model_name=pipeline_extra["mlflow_model_name"], registry_model_version=latest_version, replace_existing=False, + timeout=180, ) latest_metric = promote_get_metric( id="get_metrics_latest_model_version", @@ -162,6 +163,7 @@ def {{product_name}}_training( registry_model_name=pipeline_extra["mlflow_model_name"], registry_model_version=current_version, replace_existing=False, + timeout=180, after=["get_metrics_latest_model_version"], ) current_metric = promote_get_metric( diff --git a/tests/test_template.py b/tests/test_template.py index cb8dbd7..9cccf68 100644 --- a/tests/test_template.py +++ b/tests/test_template.py @@ -92,7 +92,7 @@ def generate_and_run_project( except subprocess.CalledProcessError as e: raise RuntimeError( f"Failed to run project generated with parameters: {answers}\n" - f"{e.output}" + f"{e.output.decode()}" ) from e # check the pipeline run is successful From 7237b1670f038ec648ab99213bf6615027961e16 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 12 Oct 2023 12:18:03 +0200 Subject: [PATCH 12/17] try replace --- template/pipelines/batch_inference.py | 3 +-- template/pipelines/training.py | 6 ++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/template/pipelines/batch_inference.py b/template/pipelines/batch_inference.py index 8d54fee..e777ee6 100644 --- a/template/pipelines/batch_inference.py +++ b/template/pipelines/batch_inference.py @@ -66,8 +66,7 @@ def {{product_name}}_batch_inference(): deployment_service = mlflow_model_registry_deployer_step( registry_model_name=get_pipeline_context().extra["mlflow_model_name"], registry_model_version=registry_model_version, - replace_existing=False, - timeout=180, + replace_existing=True, ) inference_predict( deployment_service=deployment_service, diff --git a/template/pipelines/training.py b/template/pipelines/training.py index 8150b65..bef85bf 100644 --- a/template/pipelines/training.py +++ b/template/pipelines/training.py @@ -149,8 +149,7 @@ def {{product_name}}_training( id="deploy_latest_model_version", registry_model_name=pipeline_extra["mlflow_model_name"], registry_model_version=latest_version, - replace_existing=False, - timeout=180, + replace_existing=True, ) latest_metric = promote_get_metric( id="get_metrics_latest_model_version", @@ -162,8 +161,7 @@ def {{product_name}}_training( id="deploy_current_model_version", registry_model_name=pipeline_extra["mlflow_model_name"], registry_model_version=current_version, - replace_existing=False, - timeout=180, + replace_existing=True, after=["get_metrics_latest_model_version"], ) current_metric = promote_get_metric( From e9c70092c7e296c701fdfb166700031b7447509d Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 12 Oct 2023 13:27:31 +0200 Subject: [PATCH 13/17] testing inside python --- template/main.py | 95 +++++++++++++++++++++++++++++++ template/run.py | 126 +++++++++++------------------------------ tests/test_template.py | 20 ++----- 3 files changed, 132 insertions(+), 109 deletions(-) create mode 100644 template/main.py diff --git a/template/main.py b/template/main.py new file mode 100644 index 0000000..d827559 --- /dev/null +++ b/template/main.py @@ -0,0 +1,95 @@ +# {% include 'template/license_header' %} + +from datetime import datetime as dt +import os +from typing import Optional + +from zenml.artifacts.external_artifact import ExternalArtifact +from zenml.logger import get_logger + +from pipelines import {{product_name}}_batch_inference, {{product_name}}_training + +logger = get_logger(__name__) + + +def main( + no_cache: bool = False, + no_drop_na: bool = False, + no_normalize: bool = False, + drop_columns: Optional[str] = None, + test_size: float = 0.2, + min_train_accuracy: float = 0.8, + min_test_accuracy: float = 0.8, + fail_on_accuracy_quality_gates: bool = False, + only_inference: bool = False, +): + """Main entry point for the pipeline execution. + + This entrypoint is where everything comes together: + + * configuring pipeline with the required parameters + (some of which may come from command line arguments) + * launching the pipeline + + Args: + no_cache: If `True` cache will be disabled. + no_drop_na: If `True` NA values will not be dropped from the dataset. + no_normalize: If `True` normalization will not be done for the dataset. + drop_columns: List of comma-separated names of columns to drop from the dataset. + test_size: Percentage of records from the training dataset to go into the test dataset. + min_train_accuracy: Minimum acceptable accuracy on the train set. + min_test_accuracy: Minimum acceptable accuracy on the test set. + fail_on_accuracy_quality_gates: If `True` and any of minimal accuracy + thresholds are violated - the pipeline will fail. If `False` thresholds will + not affect the pipeline. + only_inference: If `True` only inference pipeline will be triggered. + """ + + # Run a pipeline with the required parameters. This executes + # all steps in the pipeline in the correct order using the orchestrator + # stack component that is configured in your active ZenML stack. + pipeline_args = { + "config_path":os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "config.yaml", + ) + } + if no_cache: + pipeline_args["enable_cache"] = False + + if not only_inference: + # Execute Training Pipeline + run_args_train = { + "drop_na": not no_drop_na, + "normalize": not no_normalize, + "random_seed": 42, + "test_size": test_size, + "min_train_accuracy": min_train_accuracy, + "min_test_accuracy": min_test_accuracy, + "fail_on_accuracy_quality_gates": fail_on_accuracy_quality_gates, + } + if drop_columns: + run_args_train["drop_columns"] = drop_columns.split(",") + + pipeline_args[ + "run_name" + ] = f"{{product_name}}_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + {{product_name}}_training.with_options(**pipeline_args)(**run_args_train) + logger.info("Training pipeline finished successfully!") + + # Execute Batch Inference Pipeline + run_args_inference = {} + pipeline_args[ + "run_name" + ] = f"{{product_name}}_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + {{product_name}}_batch_inference.with_options(**pipeline_args)(**run_args_inference) + + artifact = ExternalArtifact( + pipeline_name="{{product_name}}_batch_inference", + artifact_name="predictions", + ) + logger.info( + "Batch inference pipeline finished successfully! " + "You can find predictions in Artifact Store using ID: " + f"`{str(artifact.get_artifact_id())}`." + ) \ No newline at end of file diff --git a/template/run.py b/template/run.py index 4be9b6d..b97a880 100644 --- a/template/run.py +++ b/template/run.py @@ -1,14 +1,11 @@ # {% include 'template/license_header' %} import click -from datetime import datetime as dt -import os from typing import Optional -from zenml.artifacts.external_artifact import ExternalArtifact from zenml.logger import get_logger -from pipelines import {{product_name}}_batch_inference, {{product_name}}_training +from main import main logger = get_logger(__name__) @@ -23,28 +20,28 @@ Examples: - \b - # Run the pipeline with default options - python run.py - - \b - # Run the pipeline without cache - python run.py --no-cache +\b +# Run the pipeline with default options +python run.py + +\b +# Run the pipeline without cache +python run.py --no-cache - \b - # Run the pipeline without Hyperparameter tuning - python run.py --no-hp-tuning +\b +# Run the pipeline without Hyperparameter tuning +python run.py --no-hp-tuning - \b - # Run the pipeline without NA drop and normalization, - # but dropping columns [A,B,C] and keeping 10% of dataset - # as test set. - python run.py --no-drop-na --no-normalize --drop-columns A,B,C --test-size 0.1 +\b +# Run the pipeline without NA drop and normalization, +# but dropping columns [A,B,C] and keeping 10% of dataset +# as test set. +python run.py --no-drop-na --no-normalize --drop-columns A,B,C --test-size 0.1 - \b - # Run the pipeline with Quality Gate for accuracy set at 90% for train set - # and 85% for test set. If any of accuracies will be lower - pipeline will fail. - python run.py --min-train-accuracy 0.9 --min-test-accuracy 0.85 --fail-on-accuracy-quality-gates +\b +# Run the pipeline with Quality Gate for accuracy set at 90% for train set +# and 85% for test set. If any of accuracies will be lower - pipeline will fail. +python run.py --min-train-accuracy 0.9 --min-test-accuracy 0.85 --fail-on-accuracy-quality-gates """ @@ -104,7 +101,7 @@ default=False, help="Whether to run only inference pipeline.", ) -def main( +def main_click( no_cache: bool = False, no_drop_na: bool = False, no_normalize: bool = False, @@ -115,77 +112,18 @@ def main( fail_on_accuracy_quality_gates: bool = False, only_inference: bool = False, ): - """Main entry point for the pipeline execution. - - This entrypoint is where everything comes together: - - * configuring pipeline with the required parameters - (some of which may come from command line arguments) - * launching the pipeline - - Args: - no_cache: If `True` cache will be disabled. - no_drop_na: If `True` NA values will not be dropped from the dataset. - no_normalize: If `True` normalization will not be done for the dataset. - drop_columns: List of comma-separated names of columns to drop from the dataset. - test_size: Percentage of records from the training dataset to go into the test dataset. - min_train_accuracy: Minimum acceptable accuracy on the train set. - min_test_accuracy: Minimum acceptable accuracy on the test set. - fail_on_accuracy_quality_gates: If `True` and any of minimal accuracy - thresholds are violated - the pipeline will fail. If `False` thresholds will - not affect the pipeline. - only_inference: If `True` only inference pipeline will be triggered. - """ - - # Run a pipeline with the required parameters. This executes - # all steps in the pipeline in the correct order using the orchestrator - # stack component that is configured in your active ZenML stack. - pipeline_args = { - "config_path":os.path.join( - os.path.dirname(os.path.realpath(__file__)), - "config.yaml", - ) - } - if no_cache: - pipeline_args["enable_cache"] = False - - if not only_inference: - # Execute Training Pipeline - run_args_train = { - "drop_na": not no_drop_na, - "normalize": not no_normalize, - "random_seed": 42, - "test_size": test_size, - "min_train_accuracy": min_train_accuracy, - "min_test_accuracy": min_test_accuracy, - "fail_on_accuracy_quality_gates": fail_on_accuracy_quality_gates, - } - if drop_columns: - run_args_train["drop_columns"] = drop_columns.split(",") - - pipeline_args[ - "run_name" - ] = f"{{product_name}}_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" - {{product_name}}_training.with_options(**pipeline_args)(**run_args_train) - logger.info("Training pipeline finished successfully!") - - # Execute Batch Inference Pipeline - run_args_inference = {} - pipeline_args[ - "run_name" - ] = f"{{product_name}}_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" - {{product_name}}_batch_inference.with_options(**pipeline_args)(**run_args_inference) - - artifact = ExternalArtifact( - pipeline_name="{{product_name}}_batch_inference", - artifact_name="predictions", - ) - logger.info( - "Batch inference pipeline finished successfully! " - "You can find predictions in Artifact Store using ID: " - f"`{str(artifact.get_artifact_id())}`." + main( + no_cache=no_cache, + no_drop_na=no_drop_na, + no_normalize=no_normalize, + drop_columns=drop_columns, + test_size=test_size, + min_train_accuracy=min_train_accuracy, + min_test_accuracy=min_test_accuracy, + fail_on_accuracy_quality_gates=fail_on_accuracy_quality_gates, + only_inference=only_inference, ) if __name__ == "__main__": - main() + main_click() diff --git a/tests/test_template.py b/tests/test_template.py index 9cccf68..a81a351 100644 --- a/tests/test_template.py +++ b/tests/test_template.py @@ -17,7 +17,6 @@ import pathlib import platform import shutil -import subprocess import sys from typing import Optional @@ -74,26 +73,17 @@ def generate_and_run_project( dst_path=str(dst_path), data=answers, unsafe=True, + vcs_ref="HEAD", ) as worker: worker.run_copy() # MLFlow Deployer not supported on Windows if platform.system().lower()!="windows": # run the project - call = [sys.executable, "run.py"] - - try: - subprocess.check_output( - call, - cwd=str(dst_path), - env=os.environ.copy(), - stderr=subprocess.STDOUT, - ) - except subprocess.CalledProcessError as e: - raise RuntimeError( - f"Failed to run project generated with parameters: {answers}\n" - f"{e.output.decode()}" - ) from e + sys.path.append(os.curdir) + from run import main + + main() # check the pipeline run is successful for pipeline_suffix in ["_training", "_batch_inference"]: From 5bb7ae8bf1e88a2057a567521a6c20d46fc4b009 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 12 Oct 2023 14:21:29 +0200 Subject: [PATCH 14/17] restore test flow --- template/run.py | 126 ++++++++++++++++++++++++++++++----------- tests/test_template.py | 23 ++++++-- 2 files changed, 111 insertions(+), 38 deletions(-) diff --git a/template/run.py b/template/run.py index b97a880..4be9b6d 100644 --- a/template/run.py +++ b/template/run.py @@ -1,11 +1,14 @@ # {% include 'template/license_header' %} import click +from datetime import datetime as dt +import os from typing import Optional +from zenml.artifacts.external_artifact import ExternalArtifact from zenml.logger import get_logger -from main import main +from pipelines import {{product_name}}_batch_inference, {{product_name}}_training logger = get_logger(__name__) @@ -20,28 +23,28 @@ Examples: -\b -# Run the pipeline with default options -python run.py - -\b -# Run the pipeline without cache -python run.py --no-cache + \b + # Run the pipeline with default options + python run.py + + \b + # Run the pipeline without cache + python run.py --no-cache -\b -# Run the pipeline without Hyperparameter tuning -python run.py --no-hp-tuning + \b + # Run the pipeline without Hyperparameter tuning + python run.py --no-hp-tuning -\b -# Run the pipeline without NA drop and normalization, -# but dropping columns [A,B,C] and keeping 10% of dataset -# as test set. -python run.py --no-drop-na --no-normalize --drop-columns A,B,C --test-size 0.1 + \b + # Run the pipeline without NA drop and normalization, + # but dropping columns [A,B,C] and keeping 10% of dataset + # as test set. + python run.py --no-drop-na --no-normalize --drop-columns A,B,C --test-size 0.1 -\b -# Run the pipeline with Quality Gate for accuracy set at 90% for train set -# and 85% for test set. If any of accuracies will be lower - pipeline will fail. -python run.py --min-train-accuracy 0.9 --min-test-accuracy 0.85 --fail-on-accuracy-quality-gates + \b + # Run the pipeline with Quality Gate for accuracy set at 90% for train set + # and 85% for test set. If any of accuracies will be lower - pipeline will fail. + python run.py --min-train-accuracy 0.9 --min-test-accuracy 0.85 --fail-on-accuracy-quality-gates """ @@ -101,7 +104,7 @@ default=False, help="Whether to run only inference pipeline.", ) -def main_click( +def main( no_cache: bool = False, no_drop_na: bool = False, no_normalize: bool = False, @@ -112,18 +115,77 @@ def main_click( fail_on_accuracy_quality_gates: bool = False, only_inference: bool = False, ): - main( - no_cache=no_cache, - no_drop_na=no_drop_na, - no_normalize=no_normalize, - drop_columns=drop_columns, - test_size=test_size, - min_train_accuracy=min_train_accuracy, - min_test_accuracy=min_test_accuracy, - fail_on_accuracy_quality_gates=fail_on_accuracy_quality_gates, - only_inference=only_inference, + """Main entry point for the pipeline execution. + + This entrypoint is where everything comes together: + + * configuring pipeline with the required parameters + (some of which may come from command line arguments) + * launching the pipeline + + Args: + no_cache: If `True` cache will be disabled. + no_drop_na: If `True` NA values will not be dropped from the dataset. + no_normalize: If `True` normalization will not be done for the dataset. + drop_columns: List of comma-separated names of columns to drop from the dataset. + test_size: Percentage of records from the training dataset to go into the test dataset. + min_train_accuracy: Minimum acceptable accuracy on the train set. + min_test_accuracy: Minimum acceptable accuracy on the test set. + fail_on_accuracy_quality_gates: If `True` and any of minimal accuracy + thresholds are violated - the pipeline will fail. If `False` thresholds will + not affect the pipeline. + only_inference: If `True` only inference pipeline will be triggered. + """ + + # Run a pipeline with the required parameters. This executes + # all steps in the pipeline in the correct order using the orchestrator + # stack component that is configured in your active ZenML stack. + pipeline_args = { + "config_path":os.path.join( + os.path.dirname(os.path.realpath(__file__)), + "config.yaml", + ) + } + if no_cache: + pipeline_args["enable_cache"] = False + + if not only_inference: + # Execute Training Pipeline + run_args_train = { + "drop_na": not no_drop_na, + "normalize": not no_normalize, + "random_seed": 42, + "test_size": test_size, + "min_train_accuracy": min_train_accuracy, + "min_test_accuracy": min_test_accuracy, + "fail_on_accuracy_quality_gates": fail_on_accuracy_quality_gates, + } + if drop_columns: + run_args_train["drop_columns"] = drop_columns.split(",") + + pipeline_args[ + "run_name" + ] = f"{{product_name}}_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + {{product_name}}_training.with_options(**pipeline_args)(**run_args_train) + logger.info("Training pipeline finished successfully!") + + # Execute Batch Inference Pipeline + run_args_inference = {} + pipeline_args[ + "run_name" + ] = f"{{product_name}}_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + {{product_name}}_batch_inference.with_options(**pipeline_args)(**run_args_inference) + + artifact = ExternalArtifact( + pipeline_name="{{product_name}}_batch_inference", + artifact_name="predictions", + ) + logger.info( + "Batch inference pipeline finished successfully! " + "You can find predictions in Artifact Store using ID: " + f"`{str(artifact.get_artifact_id())}`." ) if __name__ == "__main__": - main_click() + main() diff --git a/tests/test_template.py b/tests/test_template.py index a81a351..62d3752 100644 --- a/tests/test_template.py +++ b/tests/test_template.py @@ -17,6 +17,7 @@ import pathlib import platform import shutil +import subprocess import sys from typing import Optional @@ -73,17 +74,27 @@ def generate_and_run_project( dst_path=str(dst_path), data=answers, unsafe=True, - vcs_ref="HEAD", ) as worker: worker.run_copy() # MLFlow Deployer not supported on Windows - if platform.system().lower()!="windows": + # MLFlow `service daemon is not running` error on MacOS + if platform.system().lower() not in ["windows", "macos"]: # run the project - sys.path.append(os.curdir) - from run import main - - main() + call = [sys.executable, "run.py"] + + try: + subprocess.check_output( + call, + cwd=str(dst_path), + env=os.environ.copy(), + stderr=subprocess.STDOUT, + ) + except subprocess.CalledProcessError as e: + raise RuntimeError( + f"Failed to run project generated with parameters: {answers}\n" + f"{e.output.decode()}" + ) from e # check the pipeline run is successful for pipeline_suffix in ["_training", "_batch_inference"]: From 763f432343d0bf04f2c78c461d44c64fc6799cf9 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 12 Oct 2023 14:29:20 +0200 Subject: [PATCH 15/17] update requirements --- requirements.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 3f1435b..96c96d8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1 @@ -copier -jinja2-time +zenml[templates] From c12735b972a1c704ac8827494cf6d9639be6b544 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Thu, 12 Oct 2023 15:37:40 +0200 Subject: [PATCH 16/17] properly skip macos --- tests/test_template.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_template.py b/tests/test_template.py index 62d3752..55ae0ea 100644 --- a/tests/test_template.py +++ b/tests/test_template.py @@ -79,7 +79,7 @@ def generate_and_run_project( # MLFlow Deployer not supported on Windows # MLFlow `service daemon is not running` error on MacOS - if platform.system().lower() not in ["windows", "macos"]: + if platform.system().lower() not in ["windows", "macos", "darwin"]: # run the project call = [sys.executable, "run.py"] From a0ada1e3607d981f9701cb607f3ce44b590d8288 Mon Sep 17 00:00:00 2001 From: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com> Date: Tue, 17 Oct 2023 14:50:47 +0200 Subject: [PATCH 17/17] revert ref --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 82417f6..6fe0ba6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,4 +36,4 @@ jobs: with: stack-name: ${{ matrix.stack-name }} python-version: ${{ matrix.python-version }} - ref-zenml: feature/OSS-2494-evaluate-config-early + ref-zenml: develop