Skip to content

Commit

Permalink
Proposal: split train.py into train.py and train_aml.py (#219)
Browse files Browse the repository at this point in the history
This change splits train.py into two files.

The new train.py is standalone, and has no references to AzureML. It defines three functions, split_data to split a dataframe into test/train data, and train_model which takes the test/train data and a parameter object and trains the model, and get_model_metrics, which evaluates metrics about the model. The script can be run locally, in which case it loads a dataset from a file.

The second file, train_aml.py contains reasonably general AzureML logic. It reads data from a dataset, then calls the split_data function from train.py. It loads input parameters from a config file and logs them, then calls train_model from train.py. It then uploads the model and logs any metrics returned by get_model_metrics.

The hope with these changes is to demonstrate a simple interface for integrating an existing ML script with MLOpsPython, as well as providing an example for how the core ML functionality can be invoked in multiple ways for development purposes.

Co-authored-by: Bryan J Smith <[email protected]>
  • Loading branch information
jotaylo and bjcmit authored Mar 5, 2020
1 parent 5966d3d commit 39609ae
Show file tree
Hide file tree
Showing 6 changed files with 799 additions and 155 deletions.
2 changes: 1 addition & 1 deletion .pipelines/diabetes_regression-variables-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ variables:
value: diabetes_regression
# The path to the model training script under SOURCES_DIR_TRAIN
- name: TRAIN_SCRIPT_PATH
value: training/train.py
value: training/train_aml.py
# The path to the model evaluation script under SOURCES_DIR_TRAIN
- name: EVALUATE_SCRIPT_PATH
value: evaluate/evaluate_model.py
Expand Down
39 changes: 22 additions & 17 deletions diabetes_regression/training/test_train.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
import numpy as np
from azureml.core.run import Run
from unittest.mock import Mock
from diabetes_regression.training.train import train_model
from diabetes_regression.training.train import train_model, get_model_metrics


def test_train_model():
X_train = np.array([1, 2, 3, 4, 5, 6]).reshape(-1, 1)
y_train = np.array([10, 9, 8, 8, 6, 5])
data = {"train": {"X": X_train, "y": y_train}}

reg_model = train_model(data, {"alpha": 1.2})

preds = reg_model.predict([[1], [2]])
np.testing.assert_equal(preds, [9.93939393939394, 9.03030303030303])


def test_get_model_metrics():

class MockModel:

@staticmethod
def predict(data):
return ([8.12121212, 7.21212121])

X_test = np.array([3, 4]).reshape(-1, 1)
y_test = np.array([8, 7])
data = {"train": {"X": X_train, "y": y_train},
"test": {"X": X_test, "y": y_test}}
data = {"test": {"X": X_test, "y": y_test}}

run = Mock(Run)
reg = train_model(run, data, alpha=1.2)
metrics = get_model_metrics(MockModel(), data)

_, call2 = run.log.call_args_list
nameValue, descriptionDict = call2
name, value = nameValue
description = descriptionDict['description']
assert (name == 'mse')
np.testing.assert_almost_equal(value, 0.029843893480257067)
assert (description == 'Mean squared error metric')

preds = reg.predict([[1], [2]])
np.testing.assert_equal(preds, [9.93939393939394, 9.03030303030303])
assert 'mse' in metrics
mse = metrics['mse']
np.testing.assert_almost_equal(mse, 0.029843893480257067)
174 changes: 37 additions & 137 deletions diabetes_regression/training/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,161 +23,61 @@
ARISING IN ANY WAY OUT OF THE USE OF THE SOFTWARE CODE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
"""
from azureml.core.run import Run

import os
import argparse
import pandas as pd
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import joblib
import json
from azureml.core import Dataset, Datastore, Workspace


def register_dataset(
aml_workspace: Workspace,
dataset_name: str,
datastore_name: str,
file_path: str
) -> Dataset:
datastore = Datastore.get(aml_workspace, datastore_name)
dataset = Dataset.Tabular.from_delimited_files(path=(datastore, file_path))
dataset = dataset.register(workspace=aml_workspace,
name=dataset_name,
create_new_version=True)

return dataset


def train_model(run, data, alpha):
run.log("alpha", alpha)
run.parent.log("alpha", alpha)
reg = Ridge(alpha=alpha)
reg.fit(data["train"]["X"], data["train"]["y"])
preds = reg.predict(data["test"]["X"])
run.log("mse", mean_squared_error(
preds, data["test"]["y"]), description="Mean squared error metric")
run.parent.log("mse", mean_squared_error(
preds, data["test"]["y"]), description="Mean squared error metric")
return reg


def main():
print("Running train.py")

parser = argparse.ArgumentParser("train")

parser.add_argument(
"--model_name",
type=str,
help="Name of the Model",
default="sklearn_regression_model.pkl",
)

parser.add_argument(
"--step_output",
type=str,
help=("output for passing data to next step")
)

parser.add_argument(
"--dataset_version",
type=str,
help=("dataset version")
)

parser.add_argument(
"--data_file_path",
type=str,
help=("data file path, if specified,\
a new version of the dataset will be registered")
)

parser.add_argument(
"--caller_run_id",
type=str,
help=("caller run id, for example ADF pipeline run id")
)

parser.add_argument(
"--dataset_name",
type=str,
help=("Dataset name. Dataset must be passed by name\
to always get the desired dataset version\
rather than the one used while the pipeline creation")
)

args = parser.parse_args()

print("Argument [model_name]: %s" % args.model_name)
print("Argument [step_output]: %s" % args.step_output)
print("Argument [dataset_version]: %s" % args.dataset_version)
print("Argument [data_file_path]: %s" % args.data_file_path)
print("Argument [caller_run_id]: %s" % args.caller_run_id)
print("Argument [dataset_name]: %s" % args.dataset_name)

model_name = args.model_name
step_output_path = args.step_output
dataset_version = args.dataset_version
data_file_path = args.data_file_path
dataset_name = args.dataset_name

print("Getting training parameters")

with open("config.json") as f:
pars = json.load(f)
try:
alpha = pars["training"]["alpha"]
except KeyError:
alpha = 0.5

print("Parameter alpha: %s" % alpha)

run = Run.get_context()

# Get the dataset
if (dataset_name):
if (data_file_path == 'none'):
dataset = Dataset.get_by_name(run.experiment.workspace, dataset_name, dataset_version) # NOQA: E402, E501
else:
dataset = register_dataset(run.experiment.workspace,
dataset_name,
os.environ.get("DATASTORE_NAME"),
data_file_path)
else:
e = ("No dataset provided")
print(e)
raise Exception(e)

# Link dataset to the step run so it is trackable in the UI
run.input_datasets['training_data'] = dataset
run.parent.tag("dataset_id", value=dataset.id)

df = dataset.to_pandas_dataframe()
# Split the dataframe into test and train data
def split_data(df):
X = df.drop('Y', axis=1).values
y = df['Y'].values

X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=0)
data = {"train": {"X": X_train, "y": y_train},
"test": {"X": X_test, "y": y_test}}
return data


# Train the model, return the model
def train_model(data, ridge_args):
reg_model = Ridge(**ridge_args)
reg_model.fit(data["train"]["X"], data["train"]["y"])
return reg_model


# Evaluate the metrics for the model
def get_model_metrics(model, data):
preds = model.predict(data["test"]["X"])
mse = mean_squared_error(preds, data["test"]["y"])
metrics = {"mse": mse}
return metrics


def main():
print("Running train.py")

reg = train_model(run, data, alpha)
# Define training parameters
ridge_args = {"alpha": 0.5}

# Pass model file to next step
os.makedirs(step_output_path, exist_ok=True)
model_output_path = os.path.join(step_output_path, model_name)
joblib.dump(value=reg, filename=model_output_path)
# Load the training data as dataframe
data_dir = "data"
data_file = os.path.join(data_dir, 'diabetes.csv')
train_df = pd.read_csv(data_file)

# Also upload model file to run outputs for history
os.makedirs('outputs', exist_ok=True)
output_path = os.path.join('outputs', model_name)
joblib.dump(value=reg, filename=output_path)
data = split_data(train_df)

run.tag("run_type", value="train")
print(f"tags now present for run: {run.tags}")
# Train the model
model = train_model(data, ridge_args)

run.complete()
# Log the metrics for the model
metrics = get_model_metrics(model, data)
for (k, v) in metrics.items():
print(f"{k}: {v}")


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 39609ae

Please sign in to comment.