Skip to content

Commit

Permalink
Automatically name training dir in pytorch pipeline (jpata#238)
Browse files Browse the repository at this point in the history
* feat: specify number of samples as cmd line arg

--ntrain: training samples
--ntest validation samples

* feat: automatically name training dir

* fix: formatting

* fix: test script passing old cmd line arg

* fix: test only requires --load when not given --train
  • Loading branch information
erwulff authored Oct 16, 2023
1 parent 4cb8a81 commit 9d21b93
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 45 deletions.
2 changes: 1 addition & 1 deletion mlpf/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
prepare_callbacks,
create_comet_experiment,
)
from utils import create_experiment_dir
from tfmodel.utils import (
create_experiment_dir,
delete_all_but_best_checkpoint,
get_best_checkpoint,
get_datasets,
Expand Down
18 changes: 3 additions & 15 deletions mlpf/pyg/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import json
import os
import os.path as osp
import pickle as pkl
from typing import List, Optional

Expand Down Expand Up @@ -122,25 +120,15 @@
}


def save_mlpf(args, mlpf, model_kwargs):
def save_mlpf(args, mlpf, model_kwargs, outdir):
"""Simple function to store the model parameters and training hyperparameters."""

if not osp.isdir(args.model_prefix):
os.system(f"mkdir -p {args.model_prefix}")

else: # if directory already exists
assert args.overwrite, f"model {args.model_prefix} already exists, please delete it"

print("model already exists, deleting it")
os.system(f"rm -rf {args.model_prefix}")
os.system(f"mkdir -p {args.model_prefix}")

with open(f"{args.model_prefix}/model_kwargs.pkl", "wb") as f: # dump model architecture
with open(f"{outdir}/model_kwargs.pkl", "wb") as f: # dump model architecture
pkl.dump(model_kwargs, f, protocol=pkl.HIGHEST_PROTOCOL)

num_mlpf_parameters = sum(p.numel() for p in mlpf.parameters() if p.requires_grad)

with open(f"{args.model_prefix}/hyperparameters.json", "w") as fp: # dump hyperparameters
with open(f"{outdir}/hyperparameters.json", "w") as fp: # dump hyperparameters
json.dump({**{"Num of mlpf parameters": num_mlpf_parameters}, **vars(args)}, fp)


Expand Down
44 changes: 32 additions & 12 deletions mlpf/pyg_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import argparse
import logging
import os
from pathlib import Path
import pickle as pkl

import yaml
Expand All @@ -21,21 +22,22 @@
from pyg.mlpf import MLPF
from pyg.training import train_mlpf
from pyg.utils import CLASS_LABELS, X_FEATURES, PFDataset, InterleavedIterator, save_mlpf
from utils import create_experiment_dir

logging.basicConfig(level=logging.INFO)

parser = argparse.ArgumentParser()

parser.add_argument("--config", type=str, default="parameters/pyg-config.yaml", help="yaml config")
parser.add_argument("--model-prefix", type=str, default="experiments/MLPF_model", help="directory to hold the model")
parser.add_argument("--prefix", type=str, default="test_", help="prefix appended to result dir name")
parser.add_argument("--overwrite", dest="overwrite", action="store_true", help="overwrites the model if True")
parser.add_argument("--data_dir", type=str, default="/pfvol/tensorflow_datasets/", help="path to `tensorflow_datasets/`")
parser.add_argument("--gpus", type=str, default="0", help="to use CPU set to empty string; else e.g., `0,1`")
parser.add_argument(
"--gpu-batch-multiplier", type=int, default=1, help="Increase batch size per GPU by this constant factor"
)
parser.add_argument("--dataset", type=str, choices=["clic", "cms", "delphes"], required=True, help="which dataset?")
parser.add_argument("--load", action="store_true", help="load the model (no training)")
parser.add_argument("--load", type=str, default=None, help="dir from which to load a saved model")
parser.add_argument("--train", action="store_true", help="initiates a training")
parser.add_argument("--test", action="store_true", help="tests the model")
parser.add_argument("--num-epochs", type=int, default=3, help="number of training epochs")
Expand All @@ -60,16 +62,20 @@ def run(rank, world_size, args):
config = yaml.safe_load(stream)

if args.load: # load a pre-trained model
with open(f"{args.model_prefix}/model_kwargs.pkl", "rb") as f:
outdir = args.load

with open(f"{outdir}/model_kwargs.pkl", "rb") as f:
model_kwargs = pkl.load(f)

model = MLPF(**model_kwargs)

model_state = torch.load(f"{args.model_prefix}/best_epoch_weights.pth", map_location=torch.device(rank))
model_state = torch.load(f"{outdir}/best_epoch_weights.pth", map_location=torch.device(rank))
if isinstance(model, torch.nn.parallel.DistributedDataParallel):
model.module.load_state_dict(model_state)
else:
model.load_state_dict(model_state)
if (rank == 0) or (rank == "cpu"):
_logger.info(f"Loaded model weights from {outdir}/best_epoch_weight.pth")

else: # instantiate a new model
model_kwargs = {
Expand All @@ -79,18 +85,24 @@ def run(rank, world_size, args):
}
model = MLPF(**model_kwargs)

save_mlpf(args, model, model_kwargs) # save model_kwargs and hyperparameters

model.to(rank)

if world_size > 1:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])

_logger.info(model)
_logger.info(f"Model directory {args.model_prefix}", color="bold")
if (rank == 0) or (rank == "cpu"):
_logger.info(model)

if args.train:
# always create a new outdir when training a model to never overwrite
# loaded weights from previous trainings
if (rank == 0) or (rank == "cpu"):
outdir = create_experiment_dir(prefix=args.prefix + Path(args.config).stem + "_")
save_mlpf(args, model, model_kwargs, outdir) # save model_kwargs and hyperparameters
_logger.info("Creating experiment dir {}".format(outdir))
_logger.info(f"Model directory {outdir}", color="bold")

train_loaders, valid_loaders = [], []
for sample in config["train_dataset"][args.dataset]:
version = config["train_dataset"][args.dataset][sample]["version"]
Expand Down Expand Up @@ -124,10 +136,18 @@ def run(rank, world_size, args):
args.num_epochs,
args.patience,
args.lr,
args.model_prefix,
outdir,
)

if args.test:

if args.load is None:
# if we don't load, we must have a newly trained model
assert args.train, "Please train a model before testing, or load a model with --load"
assert outdir is not None, "Error: no outdir to evaluate model from"
else:
outdir = args.load

test_loaders = {}
for sample in config["test_dataset"][args.dataset]:
version = config["test_dataset"][args.dataset][sample]["version"]
Expand All @@ -138,7 +158,7 @@ def run(rank, world_size, args):

test_loaders[sample] = InterleavedIterator([ds.get_loader(batch_size=batch_size, world_size=world_size)])

model_state = torch.load(f"{args.model_prefix}/best_epoch_weights.pth", map_location=torch.device(rank))
model_state = torch.load(f"{outdir}/best_epoch_weights.pth", map_location=torch.device(rank))
if isinstance(model, torch.nn.parallel.DistributedDataParallel):
model.module.load_state_dict(model_state)
else:
Expand All @@ -147,14 +167,14 @@ def run(rank, world_size, args):
for sample in test_loaders:
_logger.info(f"Running predictions on {sample}")
torch.cuda.empty_cache()
run_predictions(rank, model, test_loaders[sample], sample, args.model_prefix)
run_predictions(rank, model, test_loaders[sample], sample, outdir)

if (rank == 0) or (rank == "cpu"): # make plots and export to onnx only on a single machine
if args.make_plots:
for sample in config["test_dataset"][args.dataset]:
_logger.info(f"Plotting distributions for {sample}")

make_plots(args.model_prefix, sample, args.dataset)
make_plots(outdir, sample, args.dataset)

if args.export_onnx:
try:
Expand Down
16 changes: 0 additions & 16 deletions mlpf/tfmodel/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import datetime
import logging
import os
import pickle
import platform
import re
from pathlib import Path

Expand Down Expand Up @@ -124,20 +122,6 @@ def parse_config(config, ntrain=None, ntest=None, nepochs=None, weights=None):
return config, config_file_stem


def create_experiment_dir(prefix=None, suffix=None):
if prefix is None:
train_dir = Path("experiments") / datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
else:
train_dir = Path("experiments") / (prefix + datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f"))

if suffix is not None:
train_dir = train_dir.with_name(train_dir.name + "." + platform.node())

train_dir.mkdir(parents=True)
logging.info("Creating experiment dir {}".format(train_dir))
return str(train_dir)


def get_best_checkpoint(train_dir):
checkpoint_list = list(Path(Path(train_dir) / "weights").glob("weights*.hdf5"))
# Sort the checkpoints according to the loss in their filenames
Expand Down
16 changes: 16 additions & 0 deletions mlpf/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pathlib import Path
import datetime
import platform


def create_experiment_dir(prefix=None, suffix=None):
if prefix is None:
train_dir = Path("experiments") / datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
else:
train_dir = Path("experiments") / (prefix + datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f"))

if suffix is not None:
train_dir = train_dir.with_name(train_dir.name + "." + platform.node())

train_dir.mkdir(parents=True)
return str(train_dir)
2 changes: 1 addition & 1 deletion scripts/local_test_pyg.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ mkdir -p experiments

tfds build mlpf/heptfds/cms_pf/ttbar --manual_dir ./local_test_data

python mlpf/pyg_pipeline.py --config parameters/pyg-config-test.yaml --dataset cms --data_dir ./tensorflow_datasets/ --model-prefix ./experiments/MLPF_test --gpus "" --train --test --make-plots
python mlpf/pyg_pipeline.py --config parameters/pyg-config-test.yaml --dataset cms --data_dir ./tensorflow_datasets/ --prefix MLPF_test --gpus "" --train --test --make-plots

0 comments on commit 9d21b93

Please sign in to comment.