diff --git a/mlpf/pipeline.py b/mlpf/pipeline.py index 2d7ece032..47f1b648e 100644 --- a/mlpf/pipeline.py +++ b/mlpf/pipeline.py @@ -37,8 +37,8 @@ prepare_callbacks, create_comet_experiment, ) +from utils import create_experiment_dir from tfmodel.utils import ( - create_experiment_dir, delete_all_but_best_checkpoint, get_best_checkpoint, get_datasets, diff --git a/mlpf/pyg/utils.py b/mlpf/pyg/utils.py index 1bb1d3877..307bade5e 100644 --- a/mlpf/pyg/utils.py +++ b/mlpf/pyg/utils.py @@ -1,6 +1,4 @@ import json -import os -import os.path as osp import pickle as pkl from typing import List, Optional @@ -122,25 +120,15 @@ } -def save_mlpf(args, mlpf, model_kwargs): +def save_mlpf(args, mlpf, model_kwargs, outdir): """Simple function to store the model parameters and training hyperparameters.""" - if not osp.isdir(args.model_prefix): - os.system(f"mkdir -p {args.model_prefix}") - - else: # if directory already exists - assert args.overwrite, f"model {args.model_prefix} already exists, please delete it" - - print("model already exists, deleting it") - os.system(f"rm -rf {args.model_prefix}") - os.system(f"mkdir -p {args.model_prefix}") - - with open(f"{args.model_prefix}/model_kwargs.pkl", "wb") as f: # dump model architecture + with open(f"{outdir}/model_kwargs.pkl", "wb") as f: # dump model architecture pkl.dump(model_kwargs, f, protocol=pkl.HIGHEST_PROTOCOL) num_mlpf_parameters = sum(p.numel() for p in mlpf.parameters() if p.requires_grad) - with open(f"{args.model_prefix}/hyperparameters.json", "w") as fp: # dump hyperparameters + with open(f"{outdir}/hyperparameters.json", "w") as fp: # dump hyperparameters json.dump({**{"Num of mlpf parameters": num_mlpf_parameters}, **vars(args)}, fp) diff --git a/mlpf/pyg_pipeline.py b/mlpf/pyg_pipeline.py index 93341fbc0..e27cc035e 100644 --- a/mlpf/pyg_pipeline.py +++ b/mlpf/pyg_pipeline.py @@ -7,6 +7,7 @@ import argparse import logging import os +from pathlib import Path import pickle as pkl import yaml @@ -21,13 +22,14 @@ from pyg.mlpf import MLPF from pyg.training import train_mlpf from pyg.utils import CLASS_LABELS, X_FEATURES, PFDataset, InterleavedIterator, save_mlpf +from utils import create_experiment_dir logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser() parser.add_argument("--config", type=str, default="parameters/pyg-config.yaml", help="yaml config") -parser.add_argument("--model-prefix", type=str, default="experiments/MLPF_model", help="directory to hold the model") +parser.add_argument("--prefix", type=str, default="test_", help="prefix appended to result dir name") parser.add_argument("--overwrite", dest="overwrite", action="store_true", help="overwrites the model if True") parser.add_argument("--data_dir", type=str, default="/pfvol/tensorflow_datasets/", help="path to `tensorflow_datasets/`") parser.add_argument("--gpus", type=str, default="0", help="to use CPU set to empty string; else e.g., `0,1`") @@ -35,7 +37,7 @@ "--gpu-batch-multiplier", type=int, default=1, help="Increase batch size per GPU by this constant factor" ) parser.add_argument("--dataset", type=str, choices=["clic", "cms", "delphes"], required=True, help="which dataset?") -parser.add_argument("--load", action="store_true", help="load the model (no training)") +parser.add_argument("--load", type=str, default=None, help="dir from which to load a saved model") parser.add_argument("--train", action="store_true", help="initiates a training") parser.add_argument("--test", action="store_true", help="tests the model") parser.add_argument("--num-epochs", type=int, default=3, help="number of training epochs") @@ -60,16 +62,20 @@ def run(rank, world_size, args): config = yaml.safe_load(stream) if args.load: # load a pre-trained model - with open(f"{args.model_prefix}/model_kwargs.pkl", "rb") as f: + outdir = args.load + + with open(f"{outdir}/model_kwargs.pkl", "rb") as f: model_kwargs = pkl.load(f) model = MLPF(**model_kwargs) - model_state = torch.load(f"{args.model_prefix}/best_epoch_weights.pth", map_location=torch.device(rank)) + model_state = torch.load(f"{outdir}/best_epoch_weights.pth", map_location=torch.device(rank)) if isinstance(model, torch.nn.parallel.DistributedDataParallel): model.module.load_state_dict(model_state) else: model.load_state_dict(model_state) + if (rank == 0) or (rank == "cpu"): + _logger.info(f"Loaded model weights from {outdir}/best_epoch_weight.pth") else: # instantiate a new model model_kwargs = { @@ -79,18 +85,24 @@ def run(rank, world_size, args): } model = MLPF(**model_kwargs) - save_mlpf(args, model, model_kwargs) # save model_kwargs and hyperparameters - model.to(rank) if world_size > 1: model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model) model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank]) - _logger.info(model) - _logger.info(f"Model directory {args.model_prefix}", color="bold") + if (rank == 0) or (rank == "cpu"): + _logger.info(model) if args.train: + # always create a new outdir when training a model to never overwrite + # loaded weights from previous trainings + if (rank == 0) or (rank == "cpu"): + outdir = create_experiment_dir(prefix=args.prefix + Path(args.config).stem + "_") + save_mlpf(args, model, model_kwargs, outdir) # save model_kwargs and hyperparameters + _logger.info("Creating experiment dir {}".format(outdir)) + _logger.info(f"Model directory {outdir}", color="bold") + train_loaders, valid_loaders = [], [] for sample in config["train_dataset"][args.dataset]: version = config["train_dataset"][args.dataset][sample]["version"] @@ -124,10 +136,18 @@ def run(rank, world_size, args): args.num_epochs, args.patience, args.lr, - args.model_prefix, + outdir, ) if args.test: + + if args.load is None: + # if we don't load, we must have a newly trained model + assert args.train, "Please train a model before testing, or load a model with --load" + assert outdir is not None, "Error: no outdir to evaluate model from" + else: + outdir = args.load + test_loaders = {} for sample in config["test_dataset"][args.dataset]: version = config["test_dataset"][args.dataset][sample]["version"] @@ -138,7 +158,7 @@ def run(rank, world_size, args): test_loaders[sample] = InterleavedIterator([ds.get_loader(batch_size=batch_size, world_size=world_size)]) - model_state = torch.load(f"{args.model_prefix}/best_epoch_weights.pth", map_location=torch.device(rank)) + model_state = torch.load(f"{outdir}/best_epoch_weights.pth", map_location=torch.device(rank)) if isinstance(model, torch.nn.parallel.DistributedDataParallel): model.module.load_state_dict(model_state) else: @@ -147,14 +167,14 @@ def run(rank, world_size, args): for sample in test_loaders: _logger.info(f"Running predictions on {sample}") torch.cuda.empty_cache() - run_predictions(rank, model, test_loaders[sample], sample, args.model_prefix) + run_predictions(rank, model, test_loaders[sample], sample, outdir) if (rank == 0) or (rank == "cpu"): # make plots and export to onnx only on a single machine if args.make_plots: for sample in config["test_dataset"][args.dataset]: _logger.info(f"Plotting distributions for {sample}") - make_plots(args.model_prefix, sample, args.dataset) + make_plots(outdir, sample, args.dataset) if args.export_onnx: try: diff --git a/mlpf/tfmodel/utils.py b/mlpf/tfmodel/utils.py index 29f18298d..e6700939c 100644 --- a/mlpf/tfmodel/utils.py +++ b/mlpf/tfmodel/utils.py @@ -1,8 +1,6 @@ -import datetime import logging import os import pickle -import platform import re from pathlib import Path @@ -124,20 +122,6 @@ def parse_config(config, ntrain=None, ntest=None, nepochs=None, weights=None): return config, config_file_stem -def create_experiment_dir(prefix=None, suffix=None): - if prefix is None: - train_dir = Path("experiments") / datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") - else: - train_dir = Path("experiments") / (prefix + datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")) - - if suffix is not None: - train_dir = train_dir.with_name(train_dir.name + "." + platform.node()) - - train_dir.mkdir(parents=True) - logging.info("Creating experiment dir {}".format(train_dir)) - return str(train_dir) - - def get_best_checkpoint(train_dir): checkpoint_list = list(Path(Path(train_dir) / "weights").glob("weights*.hdf5")) # Sort the checkpoints according to the loss in their filenames diff --git a/mlpf/utils.py b/mlpf/utils.py new file mode 100644 index 000000000..9ff96be60 --- /dev/null +++ b/mlpf/utils.py @@ -0,0 +1,16 @@ +from pathlib import Path +import datetime +import platform + + +def create_experiment_dir(prefix=None, suffix=None): + if prefix is None: + train_dir = Path("experiments") / datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") + else: + train_dir = Path("experiments") / (prefix + datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")) + + if suffix is not None: + train_dir = train_dir.with_name(train_dir.name + "." + platform.node()) + + train_dir.mkdir(parents=True) + return str(train_dir) diff --git a/scripts/local_test_pyg.sh b/scripts/local_test_pyg.sh index 91585aed1..a9f157906 100755 --- a/scripts/local_test_pyg.sh +++ b/scripts/local_test_pyg.sh @@ -27,4 +27,4 @@ mkdir -p experiments tfds build mlpf/heptfds/cms_pf/ttbar --manual_dir ./local_test_data -python mlpf/pyg_pipeline.py --config parameters/pyg-config-test.yaml --dataset cms --data_dir ./tensorflow_datasets/ --model-prefix ./experiments/MLPF_test --gpus "" --train --test --make-plots +python mlpf/pyg_pipeline.py --config parameters/pyg-config-test.yaml --dataset cms --data_dir ./tensorflow_datasets/ --prefix MLPF_test --gpus "" --train --test --make-plots