Skip to content

Commit

Permalink
testing inside python
Browse files Browse the repository at this point in the history
  • Loading branch information
avishniakov committed Oct 12, 2023
1 parent 7237b16 commit e9c7009
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 109 deletions.
95 changes: 95 additions & 0 deletions template/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# {% include 'template/license_header' %}

from datetime import datetime as dt
import os
from typing import Optional

from zenml.artifacts.external_artifact import ExternalArtifact
from zenml.logger import get_logger

from pipelines import {{product_name}}_batch_inference, {{product_name}}_training

logger = get_logger(__name__)


def main(
no_cache: bool = False,
no_drop_na: bool = False,
no_normalize: bool = False,
drop_columns: Optional[str] = None,
test_size: float = 0.2,
min_train_accuracy: float = 0.8,
min_test_accuracy: float = 0.8,
fail_on_accuracy_quality_gates: bool = False,
only_inference: bool = False,
):
"""Main entry point for the pipeline execution.
This entrypoint is where everything comes together:
* configuring pipeline with the required parameters
(some of which may come from command line arguments)
* launching the pipeline
Args:
no_cache: If `True` cache will be disabled.
no_drop_na: If `True` NA values will not be dropped from the dataset.
no_normalize: If `True` normalization will not be done for the dataset.
drop_columns: List of comma-separated names of columns to drop from the dataset.
test_size: Percentage of records from the training dataset to go into the test dataset.
min_train_accuracy: Minimum acceptable accuracy on the train set.
min_test_accuracy: Minimum acceptable accuracy on the test set.
fail_on_accuracy_quality_gates: If `True` and any of minimal accuracy
thresholds are violated - the pipeline will fail. If `False` thresholds will
not affect the pipeline.
only_inference: If `True` only inference pipeline will be triggered.
"""

# Run a pipeline with the required parameters. This executes
# all steps in the pipeline in the correct order using the orchestrator
# stack component that is configured in your active ZenML stack.
pipeline_args = {
"config_path":os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"config.yaml",
)
}
if no_cache:
pipeline_args["enable_cache"] = False

if not only_inference:
# Execute Training Pipeline
run_args_train = {
"drop_na": not no_drop_na,
"normalize": not no_normalize,
"random_seed": 42,
"test_size": test_size,
"min_train_accuracy": min_train_accuracy,
"min_test_accuracy": min_test_accuracy,
"fail_on_accuracy_quality_gates": fail_on_accuracy_quality_gates,
}
if drop_columns:
run_args_train["drop_columns"] = drop_columns.split(",")

pipeline_args[
"run_name"
] = f"{{product_name}}_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
{{product_name}}_training.with_options(**pipeline_args)(**run_args_train)
logger.info("Training pipeline finished successfully!")

# Execute Batch Inference Pipeline
run_args_inference = {}
pipeline_args[
"run_name"
] = f"{{product_name}}_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
{{product_name}}_batch_inference.with_options(**pipeline_args)(**run_args_inference)

artifact = ExternalArtifact(
pipeline_name="{{product_name}}_batch_inference",
artifact_name="predictions",
)
logger.info(
"Batch inference pipeline finished successfully! "
"You can find predictions in Artifact Store using ID: "
f"`{str(artifact.get_artifact_id())}`."
)
126 changes: 32 additions & 94 deletions template/run.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# {% include 'template/license_header' %}

import click
from datetime import datetime as dt
import os
from typing import Optional

from zenml.artifacts.external_artifact import ExternalArtifact
from zenml.logger import get_logger

from pipelines import {{product_name}}_batch_inference, {{product_name}}_training
from main import main

logger = get_logger(__name__)

Expand All @@ -23,28 +20,28 @@
Examples:
\b
# Run the pipeline with default options
python run.py
\b
# Run the pipeline without cache
python run.py --no-cache
\b
# Run the pipeline with default options
python run.py
\b
# Run the pipeline without cache
python run.py --no-cache
\b
# Run the pipeline without Hyperparameter tuning
python run.py --no-hp-tuning
\b
# Run the pipeline without Hyperparameter tuning
python run.py --no-hp-tuning
\b
# Run the pipeline without NA drop and normalization,
# but dropping columns [A,B,C] and keeping 10% of dataset
# as test set.
python run.py --no-drop-na --no-normalize --drop-columns A,B,C --test-size 0.1
\b
# Run the pipeline without NA drop and normalization,
# but dropping columns [A,B,C] and keeping 10% of dataset
# as test set.
python run.py --no-drop-na --no-normalize --drop-columns A,B,C --test-size 0.1
\b
# Run the pipeline with Quality Gate for accuracy set at 90% for train set
# and 85% for test set. If any of accuracies will be lower - pipeline will fail.
python run.py --min-train-accuracy 0.9 --min-test-accuracy 0.85 --fail-on-accuracy-quality-gates
\b
# Run the pipeline with Quality Gate for accuracy set at 90% for train set
# and 85% for test set. If any of accuracies will be lower - pipeline will fail.
python run.py --min-train-accuracy 0.9 --min-test-accuracy 0.85 --fail-on-accuracy-quality-gates
"""
Expand Down Expand Up @@ -104,7 +101,7 @@
default=False,
help="Whether to run only inference pipeline.",
)
def main(
def main_click(
no_cache: bool = False,
no_drop_na: bool = False,
no_normalize: bool = False,
Expand All @@ -115,77 +112,18 @@ def main(
fail_on_accuracy_quality_gates: bool = False,
only_inference: bool = False,
):
"""Main entry point for the pipeline execution.
This entrypoint is where everything comes together:
* configuring pipeline with the required parameters
(some of which may come from command line arguments)
* launching the pipeline
Args:
no_cache: If `True` cache will be disabled.
no_drop_na: If `True` NA values will not be dropped from the dataset.
no_normalize: If `True` normalization will not be done for the dataset.
drop_columns: List of comma-separated names of columns to drop from the dataset.
test_size: Percentage of records from the training dataset to go into the test dataset.
min_train_accuracy: Minimum acceptable accuracy on the train set.
min_test_accuracy: Minimum acceptable accuracy on the test set.
fail_on_accuracy_quality_gates: If `True` and any of minimal accuracy
thresholds are violated - the pipeline will fail. If `False` thresholds will
not affect the pipeline.
only_inference: If `True` only inference pipeline will be triggered.
"""

# Run a pipeline with the required parameters. This executes
# all steps in the pipeline in the correct order using the orchestrator
# stack component that is configured in your active ZenML stack.
pipeline_args = {
"config_path":os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"config.yaml",
)
}
if no_cache:
pipeline_args["enable_cache"] = False

if not only_inference:
# Execute Training Pipeline
run_args_train = {
"drop_na": not no_drop_na,
"normalize": not no_normalize,
"random_seed": 42,
"test_size": test_size,
"min_train_accuracy": min_train_accuracy,
"min_test_accuracy": min_test_accuracy,
"fail_on_accuracy_quality_gates": fail_on_accuracy_quality_gates,
}
if drop_columns:
run_args_train["drop_columns"] = drop_columns.split(",")

pipeline_args[
"run_name"
] = f"{{product_name}}_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
{{product_name}}_training.with_options(**pipeline_args)(**run_args_train)
logger.info("Training pipeline finished successfully!")

# Execute Batch Inference Pipeline
run_args_inference = {}
pipeline_args[
"run_name"
] = f"{{product_name}}_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
{{product_name}}_batch_inference.with_options(**pipeline_args)(**run_args_inference)

artifact = ExternalArtifact(
pipeline_name="{{product_name}}_batch_inference",
artifact_name="predictions",
)
logger.info(
"Batch inference pipeline finished successfully! "
"You can find predictions in Artifact Store using ID: "
f"`{str(artifact.get_artifact_id())}`."
main(
no_cache=no_cache,
no_drop_na=no_drop_na,
no_normalize=no_normalize,
drop_columns=drop_columns,
test_size=test_size,
min_train_accuracy=min_train_accuracy,
min_test_accuracy=min_test_accuracy,
fail_on_accuracy_quality_gates=fail_on_accuracy_quality_gates,
only_inference=only_inference,
)


if __name__ == "__main__":
main()
main_click()
20 changes: 5 additions & 15 deletions tests/test_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import pathlib
import platform
import shutil
import subprocess
import sys
from typing import Optional

Expand Down Expand Up @@ -74,26 +73,17 @@ def generate_and_run_project(
dst_path=str(dst_path),
data=answers,
unsafe=True,
vcs_ref="HEAD",
) as worker:
worker.run_copy()

# MLFlow Deployer not supported on Windows
if platform.system().lower()!="windows":
# run the project
call = [sys.executable, "run.py"]

try:
subprocess.check_output(
call,
cwd=str(dst_path),
env=os.environ.copy(),
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Failed to run project generated with parameters: {answers}\n"
f"{e.output.decode()}"
) from e
sys.path.append(os.curdir)
from run import main

main()

# check the pipeline run is successful
for pipeline_suffix in ["_training", "_batch_inference"]:
Expand Down

0 comments on commit e9c7009

Please sign in to comment.