Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artifact Control Plane #24

Merged
merged 2 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ jobs:
with:
stack-name: ${{ matrix.stack-name }}
python-version: ${{ matrix.python-version }}
ref-zenml: feature/OSS-2609-OSS-2575-model-config-is-model-version
ref-zenml: feature/OSS-2190-data-as-first-class-citizen
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,15 @@ To make the most of the Model Control Plane we additionally annotate the output
<summary>Code snippet 💻</summary>

```python
from zenml.model import ModelArtifactConfig
from zenml import ArtifactConfig

experiment_tracker = Client().active_stack.experiment_tracker
@step(experiment_tracker=experiment_tracker.name)
def model_trainer(
...
) -> Annotated[ClassifierMixin, "model", ModelArtifactConfig()]:
) -> Annotated[
ClassifierMixin, ArtifactConfig(name="model", is_model_artifact=True)
]:
...
```
</details>
Expand Down Expand Up @@ -308,7 +310,7 @@ You can follow [Data Validators docs](https://docs.zenml.io/stacks-and-component

As a last step concluding all work done so far, we will calculate predictions on the inference dataset and persist them in [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) attached to the current inference model version of the Model Control Plane for reuse and observability.

We will leverage a prepared predictions service called `mlflow_deployment` linked to the inference model version of the Model Control Plane to run `.predict()` and to put predictions as an output of the predictions step, so it is automatically stored in the [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) and linked to the Model Control Plane model version as a versioned artifact link with zero effort. This is achieved because we additionally annotated the `predictions` output with `DataArtifactConfig(overwrite=False)`. This is required to deliver a comprehensive history to stakeholders since Batch Inference can be executed using the same Model Control Plane version multiple times.
We will leverage a prepared predictions service called `mlflow_deployment` linked to the inference model version of the Model Control Plane to run `.predict()` and to put predictions as an output of the predictions step, so it is automatically stored in the [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) and linked to the Model Control Plane model version as a versioned artifact link with zero effort.

```
NOTE: On non-local orchestrators a `model` artifact will be loaded into memory to run predictions directly. You can adapt this part to your needs.
Expand All @@ -318,12 +320,10 @@ NOTE: On non-local orchestrators a `model` artifact will be loaded into memory t
<summary>Code snippet 💻</summary>

```python
from zenml.model import DataArtifactConfig

@step
def inference_predict(
dataset_inf: pd.DataFrame,
) -> Annotated[pd.Series, "predictions", DataArtifactConfig(overwrite=False)]:
) -> Annotated[pd.Series, "predictions"]:
model_version = get_step_context().model_version

# get predictor
Expand Down
16 changes: 4 additions & 12 deletions template/pipelines/batch_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
notify_on_failure,
notify_on_success,
)
from zenml import pipeline
from zenml.artifacts.external_artifact import ExternalArtifact
from zenml import ExternalArtifact, pipeline
from zenml.integrations.evidently.metrics import EvidentlyMetricConfig
from zenml.integrations.evidently.steps import evidently_report_step
from zenml.logger import get_logger
Expand All @@ -32,26 +31,19 @@ def {{product_name}}_batch_inference():
# of one step as the input of the next step.
########## ETL stage ##########
df_inference, target, _ = data_loader(
random_state=ExternalArtifact(
model_artifact_pipeline_name="{{product_name}}_training",
model_artifact_name="random_state",
),
random_state=ExternalArtifact(name="random_state"),
is_inference=True
)
df_inference = inference_data_preprocessor(
dataset_inf=df_inference,
preprocess_pipeline=ExternalArtifact(
model_artifact_name="preprocess_pipeline",
),
preprocess_pipeline=ExternalArtifact(name="preprocess_pipeline"),
target=target,
)

{%- if data_quality_checks %}
########## DataQuality stage ##########
report, _ = evidently_report_step(
reference_dataset=ExternalArtifact(
model_artifact_name="dataset_trn",
),
reference_dataset=ExternalArtifact(name="dataset_trn"),
comparison_dataset=df_inference,
ignored_cols=["target"],
metrics=[
Expand Down
22 changes: 10 additions & 12 deletions template/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
from typing import Optional

from zenml.artifacts.external_artifact import ExternalArtifact
from zenml.client import Client
from zenml.logger import get_logger

from pipelines import {{product_name}}_batch_inference, {{product_name}}_training, {{product_name}}_deployment
Expand Down Expand Up @@ -192,17 +192,15 @@ def main(
] = f"{{product_name}}_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
{{product_name}}_batch_inference.with_options(**pipeline_args)(**run_args_inference)

artifact = ExternalArtifact(
model_artifact_name="predictions",
model_name="{{ product_name }}",
model_version="{{ target_environment }}",
model_artifact_version=None, # can be skipped - using latest artifact link
)
logger.info(
"Batch inference pipeline finished successfully! "
"You can find predictions in Artifact Store using ID: "
f"`{str(artifact.get_artifact_id())}`."
)
client = Client()
model_version = client.get_model_version("e2e_use_case", "staging")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work, because once you first run a pipeline composition on a clean environment there is no staging model version, so it will fail to retrieve. I suggest you remove this part for now and I'll follow up on this in https://zenml.atlassian.net/browse/OSS-2669

artifact = model_version.get_data_artifact("predictions")
if artifact:
logger.info(
"Batch inference pipeline finished successfully! "
"You can find predictions in Artifact Store using ID: "
f"`{str(artifact.id)}`."
)


if __name__ == "__main__":
Expand Down
6 changes: 2 additions & 4 deletions template/steps/deployment/deployment_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
from typing import Optional

from typing_extensions import Annotated
from zenml import get_step_context, step
from zenml import ArtifactConfig, get_step_context, step
from zenml.client import Client
from zenml.integrations.mlflow.services.mlflow_deployment import MLFlowDeploymentService
from zenml.integrations.mlflow.steps.mlflow_deployer import (
mlflow_model_registry_deployer_step,
)
from zenml.logger import get_logger
from zenml.model import EndpointArtifactConfig

logger = get_logger(__name__)

Expand All @@ -20,8 +19,7 @@
def deployment_deploy() -> (
Annotated[
Optional[MLFlowDeploymentService],
"mlflow_deployment",
EndpointArtifactConfig(),
ArtifactConfig(name="mlflow_deployment", is_endpoint_artifact=True),
]
):
"""Predictions step.
Expand Down
7 changes: 1 addition & 6 deletions template/steps/etl/inference_data_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,14 @@
import pandas as pd
from sklearn.pipeline import Pipeline
from zenml import step
from zenml.model import DataArtifactConfig


@step
def inference_data_preprocessor(
dataset_inf: pd.DataFrame,
preprocess_pipeline: Pipeline,
target: str,
) -> Annotated[
pd.DataFrame,
"dataset_inf",
DataArtifactConfig(overwrite=False, artifact_name="inference_dataset"),
]:
) -> Annotated[pd.DataFrame, "inference_dataset"]:
"""Data preprocessor step.

This is an example of a data processor step that prepares the data so that
Expand Down
11 changes: 5 additions & 6 deletions template/steps/inference/inference_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
from zenml import get_step_context, step
from zenml.integrations.mlflow.services.mlflow_deployment import MLFlowDeploymentService
from zenml.logger import get_logger
from zenml.model import DataArtifactConfig

logger = get_logger(__name__)


@step
def inference_predict(
dataset_inf: pd.DataFrame,
) -> Annotated[pd.Series, "predictions", DataArtifactConfig(overwrite=False)]:
) -> Annotated[pd.Series, "predictions"]:
"""Predictions step.

This is an example of a predictions step that takes the data in and returns
Expand All @@ -39,9 +38,9 @@ def inference_predict(
model_version = get_step_context().model_version

# get predictor
predictor_service: Optional[MLFlowDeploymentService] = model_version.get_endpoint_artifact(
"mlflow_deployment"
).load()
predictor_service: Optional[
MLFlowDeploymentService
] = model_version.load_artifact("mlflow_deployment")
if predictor_service is not None:
# run prediction from service
predictions = predictor_service.predict(request=dataset_inf)
Expand All @@ -51,7 +50,7 @@ def inference_predict(
"as the orchestrator is not local."
)
# run prediction from memory
predictor = model_version.get_model_artifact("model").load()
predictor = model_version.load_artifact("model")
predictions = predictor.predict(dataset_inf)

predictions = pd.Series(predictions, name="predicted")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ from typing_extensions import Annotated
import pandas as pd
from sklearn.metrics import accuracy_score
from zenml import step, get_step_context
from zenml.model import ModelVersion
from zenml.model.model_version import ModelVersion
from zenml.logger import get_logger

logger = get_logger(__name__)
Expand Down Expand Up @@ -54,8 +54,8 @@ def compute_performance_metrics_on_current_data(
else:
# Get predictors
predictors = {
latest_version_number: latest_version.get_model_artifact("model").load(),
current_version_number: current_version.get_model_artifact("model").load(),
latest_version_number: latest_version.load_artifact("model"),
current_version_number: current_version.load_artifact("model"),
}

metrics = {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# {% include 'template/license_header' %}

from zenml import get_step_context, step
from zenml.model import ModelVersion
from zenml.model.model_version import ModelVersion
from zenml.logger import get_logger

from utils import promote_in_model_registry
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# {% include 'template/license_header' %}

from zenml import get_step_context, step
from zenml.model import ModelVersion
from zenml.model.model_version import ModelVersion
from zenml.logger import get_logger

from utils import promote_in_model_registry
Expand Down
21 changes: 12 additions & 9 deletions template/steps/training/model_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
import mlflow
import pandas as pd
from sklearn.base import ClassifierMixin
from zenml import log_artifact_metadata, step
from zenml import ArtifactConfig, log_artifact_metadata, step
from zenml.client import Client
from zenml.integrations.mlflow.experiment_trackers import MLFlowExperimentTracker
from zenml.integrations.mlflow.steps.mlflow_registry import mlflow_register_model_step
from zenml.logger import get_logger
from zenml.model import ModelArtifactConfig

logger = get_logger(__name__)

Expand All @@ -31,7 +30,9 @@ def model_trainer(
model: ClassifierMixin,
target: str,
name: str,
) -> Annotated[ClassifierMixin, "model", ModelArtifactConfig()]:
) -> Annotated[
ClassifierMixin, ArtifactConfig(name="model", is_model_artifact=True)
]:
"""Configure and train a model on the training dataset.

This is an example of a model training step that takes in a dataset artifact
Expand Down Expand Up @@ -79,12 +80,14 @@ def model_trainer(
name=name,
)
# keep track of mlflow version for future use
log_artifact_metadata(
output_name="model",
model_registry_version=Client()
.active_stack.model_registry.list_model_versions(name=name)[-1]
.version,
)
model_registry = Client().active_stack.model_registry
if model_registry:
versions = model_registry.list_model_versions(name=name)
if versions:
log_artifact_metadata(
metadata={"model_registry_version": versions[-1].version},
artifact_name="model",
)
### YOUR CODE ENDS HERE ###

return model
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ def hp_tuning_select_best_model(
best_metric = -1
# consume artifacts attached to current model version in Model Control Plane
for step_name in step_names:
hp_output = model_version.get_data_artifact(
step_name=step_name, name="hp_result"
)
hp_output = model_version.get_data_artifact("hp_result")
model: ClassifierMixin = hp_output.load()
# fetch metadata we attached earlier
metric = float(hp_output.run_metadata["metric"].value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ def hp_tuning_single_search(
score = accuracy_score(y_tst, y_pred)
# log score along with output artifact as metadata
log_artifact_metadata(
output_name="hp_result",
metric=float(score),
metadata={"metric": float(score)},
artifact_name="hp_result",
)
### YOUR CODE ENDS HERE ###
return cv.best_estimator_
Loading