Skip to content

Commit

Permalink
update template to follow changes
Browse files Browse the repository at this point in the history
  • Loading branch information
avishniakov committed Nov 13, 2023
1 parent 16a147d commit 96a6f5b
Show file tree
Hide file tree
Showing 19 changed files with 119 additions and 134 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ jobs:
with:
stack-name: ${{ matrix.stack-name }}
python-version: ${{ matrix.python-version }}
ref-zenml: develop
ref-zenml: feature/OSS-2609-OSS-2575-model-config-is-model-version
44 changes: 27 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,8 @@ We will be going section by section diving into implementation details and shari
Training pipeline is designed to create a new Model Control Plane version and promote it to inference stage upon successfully passing the quality assurance at the end of the pipeline. This ensures that we always infer only on quality-assured Model Control Plane version and provides a seamless integration of required artifacts of this Model Control Plane version later on inference runs.
This is achieved by providing this configuration in `train_config.yaml` used to configure our pipeline:
```yaml
model_config:
model_version:
name: your_product_name
...
create_new_model_version: true
```
### [Continuous Training] Training Pipeline: ETL steps
Expand Down Expand Up @@ -145,20 +143,20 @@ After the steps are executed we need to collect results (one best model per each
```python
from zenml import get_step_context
model_version = get_step_context().model_config._get_model_version()
model_version = get_step_context().model_version
best_model = None
best_metric = -1
# consume artifacts attached to current model version in Model Control Plane
for full_artifact_name in model_version.artifact_object_ids:
# if artifacts comes from one of HP tuning steps
if full_artifact_name.endswith("hp_result"):
hp_output = model_version.artifacts[full_artifact_name]["1"]
model: ClassifierMixin = hp_output.load()
# fetch metadata we attached earlier
metric = float(hp_output.metadata["metric"].value)
if best_model is None or best_metric < metric:
best_model = model
for step_name in step_names:
hp_output = model_version.get_data_artifact(
step_name=step_name, name="hp_result"
)
model: ClassifierMixin = hp_output.load()
# fetch metadata we attached earlier
metric = float(hp_output.metadata["metric"].value)
if best_model is None or best_metric < metric:
best_model = model
```
</details>

Expand Down Expand Up @@ -239,7 +237,7 @@ By doing so we ensure that the best-performing version will be used for inferenc
The Deployment pipeline is designed to run with inference Model Control Plane version context. This ensures that we always infer only on quality-assured Model Control Plane version and provide seamless integration of required artifacts created during training of this Model Control Plane version.
This is achieved by providing this configuration in `deployer_config.yaml` used to configure our pipeline:
```yaml
model_config:
model_version:
name: your_product_name
version: production
```
Expand All @@ -259,7 +257,7 @@ NOTE: In this template a prediction service is only created for local orchestrat
The Batch Inference pipeline is designed to run with inference Model Control Plane version context. This ensures that we always infer only on quality-assured Model Control Plane version and provide seamless integration of required artifacts created during training of this Model Control Plane version.
This is achieved by providing this configuration in `inference_config.yaml` used to configure our pipeline:
```yaml
model_config:
model_version:
name: your_product_name
version: production
```
Expand Down Expand Up @@ -324,10 +322,22 @@ from zenml.model import ArtifactConfig

@step
def inference_predict(
deployment_service: MLFlowDeploymentService,
dataset_inf: pd.DataFrame,
) -> Annotated[pd.Series, "predictions", ArtifactConfig(overwrite=False)]:
predictions = deployment_service.predict(request=dataset_inf)
model_version = get_step_context().model_version

# get predictor
predictor_service: Optional[MLFlowDeploymentService] = model_version.get_endpoint_artifact(
"mlflow_deployment"
).load()
if predictor_service is not None:
# run prediction from service
predictions = predictor_service.predict(request=dataset_inf)
else:
# run prediction from memory
predictor = model_version.get_model_artifact("model").load()
predictions = predictor.predict(dataset_inf)

predictions = pd.Series(predictions, name="predicted")
return predictions
```
Expand Down
2 changes: 1 addition & 1 deletion template/configs/deployer_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ steps:
notify_on_success: False

# configuration of the Model Control Plane
model_config:
model_version:
name: {{ product_name }}
version: {{ target_environment }}

Expand Down
2 changes: 1 addition & 1 deletion template/configs/inference_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ steps:
notify_on_success: False

# configuration of the Model Control Plane
model_config:
model_version:
name: {{ product_name }}
version: {{ target_environment }}

Expand Down
3 changes: 1 addition & 2 deletions template/configs/train_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ steps:
notify_on_success: False

# configuration of the Model Control Plane
model_config:
model_version:
name: {{ product_name }}
license: {{ open_source_license }}
description: {{ product_name }} E2E Batch Use Case
Expand All @@ -52,7 +52,6 @@ model_config:
- sklearn
- from template
- ZenML delivered
create_new_model_version: true

# pipeline level extra configurations
extra:
Expand Down
2 changes: 1 addition & 1 deletion template/pipelines/batch_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
notify_on_success,
)
from zenml import pipeline
from zenml.artifacts.external_artifact import ExternalArtifact
from zenml.integrations.evidently.metrics import EvidentlyMetricConfig
from zenml.integrations.evidently.steps import evidently_report_step
from zenml.logger import get_logger
from zenml.artifacts.external_artifact import ExternalArtifact

logger = get_logger(__name__)

Expand Down
3 changes: 1 addition & 2 deletions template/pipelines/deployment.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# {% include 'template/license_header' %}

from steps import deployment_deploy,notify_on_success,notify_on_failure

from steps import deployment_deploy, notify_on_failure, notify_on_success
from zenml import pipeline


Expand Down
2 changes: 1 addition & 1 deletion template/pipelines/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def {{product_name}}_training(
target=target,
)
after.append(step_name)
best_model = hp_tuning_select_best_model(after=after)
best_model = hp_tuning_select_best_model(step_names=after, after=after)
{%- else %}
model_configuration = pipeline_extra["model_configuration"]
best_model = get_model_from_config(
Expand Down
21 changes: 10 additions & 11 deletions template/steps/deployment/deployment_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@


from typing import Optional
from typing_extensions import Annotated

from zenml import step, get_step_context
from typing_extensions import Annotated
from zenml import get_step_context, step
from zenml.client import Client
from zenml.logger import get_logger
from zenml.model import DeploymentArtifactConfig
from zenml.integrations.mlflow.services.mlflow_deployment import MLFlowDeploymentService
from zenml.integrations.mlflow.steps.mlflow_deployer import (
mlflow_model_registry_deployer_step,
)
from zenml.integrations.mlflow.services.mlflow_deployment import (
MLFlowDeploymentService,
)
from utils import get_model_registry_version
from zenml.logger import get_logger
from zenml.model import DeploymentArtifactConfig

logger = get_logger(__name__)

Expand Down Expand Up @@ -47,12 +44,14 @@ def deployment_deploy() -> (
"""
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
if Client().active_stack.orchestrator.flavor == "local":
model_version = get_step_context().model_config._get_model_version()
model_version = get_step_context().model_version

# deploy predictor service
deployment_service = mlflow_model_registry_deployer_step.entrypoint(
registry_model_name=model_version.model.name,
registry_model_version=get_model_registry_version(model_version),
registry_model_name=model_version.name,
registry_model_version=model_version.get_model_artifact("model")
.metadata["model_registry_version"]
.value,
replace_existing=True,
)
else:
Expand Down
2 changes: 1 addition & 1 deletion template/steps/etl/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@


from typing import Tuple
from typing_extensions import Annotated

import pandas as pd
from sklearn.datasets import load_breast_cancer
from typing_extensions import Annotated
from zenml import step
from zenml.logger import get_logger

Expand Down
12 changes: 5 additions & 7 deletions template/steps/inference/inference_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
from typing_extensions import Annotated

import pandas as pd
from zenml import step, get_step_context
from zenml import get_step_context, step
from zenml.integrations.mlflow.services.mlflow_deployment import MLFlowDeploymentService
from zenml.logger import get_logger
from zenml.model import ArtifactConfig
from zenml.integrations.mlflow.services.mlflow_deployment import (
MLFlowDeploymentService,
)

logger = get_logger(__name__)

Expand Down Expand Up @@ -38,10 +36,10 @@ def inference_predict(
The predictions as pandas series
"""
### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
model_version = get_step_context().model_config._get_model_version()
model_version = get_step_context().model_version

# get predictor
predictor_service: Optional[MLFlowDeploymentService] = model_version.get_deployment(
predictor_service: Optional[MLFlowDeploymentService] = model_version.get_endpoint_artifact(
"mlflow_deployment"
).load()
if predictor_service is not None:
Expand All @@ -53,7 +51,7 @@ def inference_predict(
"as the orchestrator is not local."
)
# run prediction from memory
predictor = model_version.get_model_object("model").load()
predictor = model_version.get_model_artifact("model").load()
predictions = predictor.predict(dataset_inf)

predictions = pd.Series(predictions, name="predicted")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ from typing_extensions import Annotated

import pandas as pd
from sklearn.metrics import accuracy_score
from zenml import step
from zenml import step, get_step_context
from zenml.model import ModelVersion
from zenml.logger import get_logger

from utils import get_model_versions

logger = get_logger(__name__)

@step
Expand Down Expand Up @@ -43,21 +42,27 @@ def compute_performance_metrics_on_current_data(
logger.info("Evaluating model metrics...")

# Get model version numbers from Model Control Plane
latest_version, current_version = get_model_versions(target_env)
latest_version = get_step_context().model_version
current_version = ModelVersion(name=latest_version.name, version=target_env)

latest_version_number = latest_version.number
current_version_number = current_version.number

# Get predictors
predictors = {
latest_version.number: latest_version.get_model_object("model").load(),
current_version.number: current_version.get_model_object("model").load(),
}
if current_version_number is None:
current_version_number = -1
metrics = {latest_version_number:1.0,current_version_number:0.0}
else:
# Get predictors
predictors = {
latest_version_number: latest_version.get_model_artifact("model").load(),
current_version_number: current_version.get_model_artifact("model").load(),
}

if latest_version != current_version:
metrics = {}
for version in [latest_version.number,current_version.number]:
for version in [latest_version_number,current_version_number]:
# predict and evaluate
predictions = predictors[version].predict(X)
metrics[version]=accuracy_score(y, predictions)
else:
metrics = {latest_version.number:1.0,current_version.number:0.0}

### YOUR CODE ENDS HERE ###
return metrics[latest_version.number],metrics[current_version.number]
return metrics[latest_version_number],metrics[current_version_number]
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# {% include 'template/license_header' %}

from zenml import get_step_context, step
from zenml.model import ModelVersion
from zenml.logger import get_logger

from utils import get_model_versions, promote_in_model_registry, get_model_registry_version
from utils import promote_in_model_registry

logger = get_logger(__name__)

Expand Down Expand Up @@ -43,9 +44,13 @@ def promote_with_metric_compare(
should_promote = True

# Get model version numbers from Model Control Plane
latest_version, current_version = get_model_versions(target_env)
latest_version = get_step_context().model_version
current_version = ModelVersion(name=latest_version.name, version=target_env)

if latest_version.number == current_version.number:
latest_version_number = latest_version.number
current_version_number = current_version.number

if current_version_number is None:
logger.info("No current model version found - promoting latest")
else:
logger.info(
Expand All @@ -62,21 +67,28 @@ def promote_with_metric_compare(
)
should_promote = False

promoted_version = get_model_registry_version(current_version)

if should_promote:
# Promote in Model Control Plane
model_version = get_step_context().model_config._get_model_version()
model_version = get_step_context().model_version
model_version.set_stage(stage=target_env, force=True)
logger.info(f"Current model version was promoted to '{target_env}'.")

# Promote in Model Registry
latest_version_model_registry_number = latest_version.get_model_artifact("model").metadata["model_registry_version"].value
if current_version_number is None:
current_version_model_registry_number = latest_version_model_registry_number
else:
current_version_model_registry_number = current_version.get_model_artifact("model").metadata["model_registry_version"].value
promote_in_model_registry(
latest_version=get_model_registry_version(latest_version),
current_version=get_model_registry_version(current_version),
latest_version=latest_version_model_registry_number,
current_version=current_version_model_registry_number,
model_name=mlflow_model_name,
target_env=target_env.capitalize(),
)
promoted_version = get_model_registry_version(latest_version)
promoted_version = latest_version_model_registry_number
else:
promoted_version = current_version.get_model_artifact("model").metadata["model_registry_version"].value

logger.info(
f"Current model version in `{target_env}` is `{promoted_version}` registered in Model Registry"
Expand Down
Loading

0 comments on commit 96a6f5b

Please sign in to comment.