diff --git a/classifier-e2e/run.ipynb b/classifier-e2e/run_full.ipynb similarity index 76% rename from classifier-e2e/run.ipynb rename to classifier-e2e/run_full.ipynb index 31d9292b..1edcd747 100644 --- a/classifier-e2e/run.ipynb +++ b/classifier-e2e/run_full.ipynb @@ -1,32 +1,19 @@ { "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# pre-demo recommendations\n", - "! zenml connect --url https://1cf18d95-zenml.cloudinfra.zenml.io \n", - "! zenml model delete breast_cancer_classifier -y" - ] - }, { "cell_type": "markdown", - "id": "63ab391a", + "id": "63f7ab34", "metadata": {}, "source": [ - "# Intro to MLOps using ZenML\n", + "# 🌍 Overview\n", "\n", - "## 🌍 Overview\n", - "\n", - "This repository is a minimalistic MLOps project intended as a starting point to learn how to put ML workflows in production. It features: \n", + "This demo is a minimalistic MLOps project intended to showcase how to put ML workflows in production. It features: \n", "\n", "- A feature engineering pipeline that loads data and prepares it for training.\n", "- A training pipeline that loads the preprocessed dataset and trains a model.\n", "- A batch inference pipeline that runs predictions on the trained model with new data.\n", - "\n", - "Follow along this notebook to understand how you can use ZenML to productionalize your ML workflows!\n", + "- A stack switching and leveraging of Sagemaker step operator to outsource training to Cloud\n", + "- An analysis of training artifacts and their lineage (including connection with W&B)\n", "\n", "\"Pipelines" ] @@ -49,7 +36,10 @@ "metadata": {}, "outputs": [], "source": [ - "!zenml integration install sklearn xgboost -y\n", + "! pip3 install -r requirements.txt\n", + "! zenml integration install sklearn xgboost -y\n", + "! zenml connect --url https://1cf18d95-zenml.cloudinfra.zenml.io \n", + "! zenml model delete breast_cancer_classifier -y\n", "\n", "import IPython\n", "IPython.Application.instance().kernel.do_shutdown(restart=True)" @@ -64,24 +54,6 @@ "the end of the installation, the notebook kernel will automatically restart." ] }, - { - "cell_type": "markdown", - "id": "e3955ff1", - "metadata": {}, - "source": [ - "Optional: If you are using [ZenML Cloud](https://zenml.io/cloud), execute the following cell with your tenant URL. Otherwise ignore." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e2587315", - "metadata": {}, - "outputs": [], - "source": [ - "!zenml connect --url https://1cf18d95-zenml.cloudinfra.zenml.io" - ] - }, { "cell_type": "code", "execution_count": null, @@ -91,7 +63,6 @@ "source": [ "# Initialize ZenML and set the default stack\n", "!zenml init\n", - "\n", "!zenml stack set local-sagemaker-step-operator-wandb" ] }, @@ -112,19 +83,13 @@ "from zenml.logger import get_logger\n", "from uuid import UUID\n", "\n", - "from typing import Optional, List\n", - "\n", "from zenml import pipeline\n", "\n", "from steps import (\n", " data_loader,\n", - " data_preprocessor,\n", - " data_splitter,\n", - " model_evaluator,\n", " inference_preprocessor\n", ")\n", - "\n", - "from zenml.logger import get_logger\n", + "from pipelines import feature_engineering, training\n", "\n", "logger = get_logger(__name__)\n", "\n", @@ -236,39 +201,97 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "b50a9537", "metadata": {}, - "outputs": [], - "source": [ - "@pipeline\n", - "def feature_engineering(\n", - " test_size: float = 0.3,\n", - " drop_na: Optional[bool] = None,\n", - " normalize: Optional[bool] = None,\n", - " drop_columns: Optional[List[str]] = None,\n", - " target: Optional[str] = \"target\",\n", - " random_state: int = 17\n", - "):\n", - " \"\"\"Feature engineering pipeline.\"\"\"\n", - " # Link all the steps together by calling them and passing the output\n", - " # of one step as the input of the next step.\n", - " raw_data = data_loader(random_state=random_state, target=target)\n", - " dataset_trn, dataset_tst = data_splitter(\n", - " dataset=raw_data,\n", - " test_size=test_size,\n", - " )\n", - " dataset_trn, dataset_tst, _ = data_preprocessor(\n", - " dataset_trn=dataset_trn,\n", - " dataset_tst=dataset_tst,\n", - " drop_na=drop_na,\n", - " normalize=normalize,\n", - " drop_columns=drop_columns,\n", - " target=target,\n", - " random_state=random_state,\n", - " )\n", - "\n", - " return dataset_trn, dataset_tst" + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[0;31m# Apache Software License 2.0\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m#\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m# Copyright (c) ZenML GmbH 2024. All rights reserved.\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m#\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m# Licensed under the Apache License, Version 2.0 (the \"License\");\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m# you may not use this file except in compliance with the License.\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m# You may obtain a copy of the License at\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m#\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m# http://www.apache.org/licenses/LICENSE-2.0\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m#\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m# Unless required by applicable law or agreed to in writing, software\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m# distributed under the License is distributed on an \"AS IS\" BASIS,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m# See the License for the specific language governing permissions and\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m# limitations under the License.\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;31m#\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;32mfrom\u001b[0m \u001b[0mtyping\u001b[0m \u001b[0;32mimport\u001b[0m \u001b[0mList\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mOptional\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;32mimport\u001b[0m \u001b[0mrandom\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;32mfrom\u001b[0m \u001b[0msteps\u001b[0m \u001b[0;32mimport\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdata_loader\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdata_preprocessor\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdata_splitter\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;32mfrom\u001b[0m \u001b[0mzenml\u001b[0m \u001b[0;32mimport\u001b[0m \u001b[0mpipeline\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;32mfrom\u001b[0m \u001b[0mzenml\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mlogger\u001b[0m \u001b[0;32mimport\u001b[0m \u001b[0mget_logger\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0mlogger\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mget_logger\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m__name__\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;34m@\u001b[0m\u001b[0mpipeline\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;32mdef\u001b[0m \u001b[0mfeature_engineering\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mtest_size\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mfloat\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;36m0.2\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdrop_na\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mOptional\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mbool\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mnormalize\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mOptional\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mbool\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdrop_columns\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mOptional\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mList\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mstr\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mtarget\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mOptional\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mstr\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m\"target\"\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mrandom_state\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mint\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0;34m\"\"\"\u001b[0m\n", + "\u001b[0;34m Feature engineering pipeline.\u001b[0m\n", + "\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m This is a pipeline that loads the data, processes it and splits\u001b[0m\n", + "\u001b[0;34m it into train and test sets.\u001b[0m\n", + "\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m Args:\u001b[0m\n", + "\u001b[0;34m test_size: Size of holdout set for training 0.0..1.0\u001b[0m\n", + "\u001b[0;34m drop_na: If `True` NA values will be removed from dataset\u001b[0m\n", + "\u001b[0;34m normalize: If `True` dataset will be normalized with MinMaxScaler\u001b[0m\n", + "\u001b[0;34m drop_columns: List of columns to drop from dataset\u001b[0m\n", + "\u001b[0;34m target: Name of target column in dataset\u001b[0m\n", + "\u001b[0;34m random_state: Random state to configure the data loader\u001b[0m\n", + "\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m Returns:\u001b[0m\n", + "\u001b[0;34m The processed datasets (dataset_trn, dataset_tst).\u001b[0m\n", + "\u001b[0;34m \"\"\"\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0;31m# Link all the steps together by calling them and passing the output\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0;31m# of one step as the input of the next step.\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mrandom_state\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mrandom_state\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mrandom\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrandint\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;36m1000\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mraw_data\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdata_loader\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mrandom_state\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mrandom_state\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtarget\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mtarget\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdataset_trn\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdataset_tst\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdata_splitter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdataset\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mraw_data\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mtest_size\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mtest_size\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdataset_trn\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdataset_tst\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0m_\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdata_preprocessor\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdataset_trn\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdataset_trn\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdataset_tst\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdataset_tst\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdrop_na\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdrop_na\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mnormalize\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mnormalize\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mdrop_columns\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdrop_columns\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mtarget\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mtarget\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0mrandom_state\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mrandom_state\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\n", + "\u001b[0;34m\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mdataset_trn\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdataset_tst\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n" + ] + } + ], + "source": [ + "# let's see how feature engineering pipeline is implemented\n", + "%pycat pipelines/feature_engineering.py" ] }, { @@ -351,7 +374,6 @@ "metadata": {}, "outputs": [], "source": [ - "client = Client()\n", "run = client.get_pipeline(\"feature_engineering\").last_run\n", "print(run.name)" ] @@ -485,42 +507,8 @@ "metadata": {}, "outputs": [], "source": [ - "import pandas as pd\n", - "from sklearn.base import ClassifierMixin\n", - "from sklearn.linear_model import SGDClassifier\n", - "from typing_extensions import Annotated\n", - "from zenml import ArtifactConfig, step\n", - "from zenml.logger import get_logger\n", - "\n", - "logger = get_logger(__name__)\n", - "\n", - "\n", - "@step\n", - "def model_trainer(\n", - " dataset_trn: pd.DataFrame,\n", - " model_type: str = \"sgd\",\n", - ") -> Annotated[\n", - " ClassifierMixin,\n", - " ArtifactConfig(name=\"breast_cancer_classifier\", is_model_artifact=True),\n", - "]:\n", - " \"\"\"Configure and train a model on the training dataset.\"\"\"\n", - " target = \"target\"\n", - " if model_type == \"sgd\":\n", - " model = SGDClassifier()\n", - " elif model_type == \"xgboost\":\n", - " from xgboost import XGBClassifier\n", - "\n", - " model = XGBClassifier()\n", - " else:\n", - " raise ValueError(f\"Unknown model type {model_type}\")\n", - "\n", - " logger.info(f\"Training model {model}...\")\n", - "\n", - " model.fit(\n", - " dataset_trn.drop(columns=[target]),\n", - " dataset_trn[target],\n", - " )\n", - " return model" + "# let's have a look at training pipeline\n", + "%pycat steps/inference_predict.py" ] }, { @@ -550,35 +538,8 @@ "metadata": {}, "outputs": [], "source": [ - "@pipeline\n", - "def training(\n", - " train_dataset_id: Optional[UUID] = None,\n", - " test_dataset_id: Optional[UUID] = None,\n", - " model_type: str = \"sgd\",\n", - " min_train_accuracy: float = 0.0,\n", - " min_test_accuracy: float = 0.0,\n", - "):\n", - " \"\"\"Model training pipeline.\"\"\" \n", - " if train_dataset_id is None or test_dataset_id is None:\n", - " # If we dont pass the IDs, this will run the feature engineering pipeline \n", - " dataset_trn, dataset_tst = feature_engineering()\n", - " else:\n", - " # Load the datasets from an older pipeline\n", - " dataset_trn = client.get_artifact_version(name_id_or_prefix=train_dataset_id)\n", - " dataset_tst = client.get_artifact_version(name_id_or_prefix=test_dataset_id)\n", - "\n", - " trained_model = model_trainer(\n", - " dataset_trn=dataset_trn,\n", - " model_type=model_type,\n", - " )\n", - "\n", - " model_evaluator(\n", - " model=trained_model,\n", - " dataset_trn=dataset_trn,\n", - " dataset_tst=dataset_tst,\n", - " min_train_accuracy=min_train_accuracy,\n", - " min_test_accuracy=min_test_accuracy,\n", - " )" + "# let's have a look at training pipeline\n", + "%pycat pipelines/inference.py" ] }, {