diff --git a/analyses/cms-open-data-ttbar/analysis.py b/analyses/cms-open-data-ttbar/analysis.py index 38e7639..baafae6 100644 --- a/analyses/cms-open-data-ttbar/analysis.py +++ b/analyses/cms-open-data-ttbar/analysis.py @@ -2,7 +2,7 @@ import multiprocessing from pathlib import Path from time import time -from typing import Optional, Tuple +from typing import Tuple from distributed import Client, get_worker, LocalCluster, SSHCluster import ml @@ -66,7 +66,11 @@ def parse_args() -> argparse.Namespace: The default is `mt`, i.e. multi-thread execution. If dask-ssh, a list of worker node hostnames to connect to should be provided via the --nodes option.""", default="mt", - choices=["mt", "dask-local", "dask-ssh"], + choices=["mt", "dask-local", "dask-ssh", "dask-remote"], + ) + p.add_argument( + "--scheduler-address", + help="Full address of the Dask scheduler, passed as argument to the distributed.Client object. If this argument is provided, the 'dask-remote' scheduler must be chosen, and it is a required argument in such case.", ) p.add_argument( "--ncores", @@ -74,7 +78,7 @@ def parse_args() -> argparse.Namespace: help=( "Number of cores to use. In case of distributed execution this is the amount of cores per node." ), - default = multiprocessing.cpu_count(), + default=multiprocessing.cpu_count(), type=int, ) p.add_argument( @@ -91,7 +95,7 @@ def parse_args() -> argparse.Namespace: return p.parse_args() -def create_dask_client(scheduler: str, ncores: int, hosts: str) -> Client: +def create_dask_client(scheduler: str, ncores: int, hosts: str, scheduler_address: str) -> Client: """Create a Dask distributed client.""" if scheduler == "dask-local": lc = LocalCluster(n_workers=ncores, threads_per_worker=1, processes=True) @@ -109,10 +113,14 @@ def create_dask_client(scheduler: str, ncores: int, hosts: str) -> Client: ) return Client(sshc) + if scheduler == "dask-remote": + return Client(scheduler_address) + raise ValueError( - f"Unexpected scheduling mode '{scheduler}'. Valid modes are ['dask-local', 'dask-ssh']." + f"Unexpected scheduling mode '{scheduler}'. Valid modes are ['dask-local', 'dask-ssh', 'dask-remote']." ) + def define_trijet_mass(df: ROOT.RDataFrame) -> ROOT.RDataFrame: """Add the trijet_mass observable to the dataframe after applying the appropriate selections.""" @@ -248,9 +256,17 @@ def book_histos( nominal_histo = df.Histo1D(histo_model, observable, "Weights") if variation == "nominal": - results.append(AGCResult(nominal_histo, region, process, variation, nominal_histo, should_vary=True)) + results.append( + AGCResult( + nominal_histo, region, process, variation, nominal_histo, should_vary=True + ) + ) else: - results.append(AGCResult(nominal_histo, region, process, variation, nominal_histo, should_vary=False)) + results.append( + AGCResult( + nominal_histo, region, process, variation, nominal_histo, should_vary=False + ) + ) print(f"Booked histogram {histo_model.fName}") ml_results: list[AGCResult] = [] @@ -275,11 +291,20 @@ def book_histos( if variation == "nominal": ml_results.append( - AGCResult(nominal_histo, feature.name, process, variation, nominal_histo, should_vary=True) + AGCResult( + nominal_histo, feature.name, process, variation, nominal_histo, should_vary=True + ) ) else: ml_results.append( - AGCResult(nominal_histo, feature.name, process, variation, nominal_histo, should_vary=False) + AGCResult( + nominal_histo, + feature.name, + process, + variation, + nominal_histo, + should_vary=False, + ) ) print(f"Booked histogram {histo_model.fName}") @@ -299,20 +324,21 @@ def load_cpp(): # must be local execution cpp_source = "helpers.h" - ROOT.gSystem.CompileMacro(str(cpp_source), "kO") + ROOT.gInterpreter.Declare(f'#include "{str(cpp_source)}"') def run_mt( - program_start: float, - args: argparse.Namespace, - inputs: list[AGCInput], - results: list[AGCResult], - ml_results: list[AGCResult]) -> None: + program_start: float, + args: argparse.Namespace, + inputs: list[AGCInput], + results: list[AGCResult], + ml_results: list[AGCResult], +) -> None: ROOT.EnableImplicitMT(args.ncores) print(f"Number of threads: {ROOT.GetThreadPoolSize()}") load_cpp() if args.inference: - ml.load_cpp() + ml.load_cpp() for input in inputs: df = ROOT.RDataFrame("Events", input.paths) @@ -326,31 +352,34 @@ def run_mt( if r.should_vary: r.histo = ROOT.RDF.Experimental.VariationsFor(r.histo) - print( - f"Building the computation graphs took {time() - program_start:.2f} seconds") + print(f"Building the computation graphs took {time() - program_start:.2f} seconds") # Run the event loops for all processes and variations here run_graphs_start = time() ROOT.RDF.RunGraphs([r.nominal_histo for r in results + ml_results]) - print( - f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds") + print(f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds") def run_distributed( - program_start: float, - args: argparse.Namespace, - inputs: list[AGCInput], - results: list[AGCResult], - ml_results: list[AGCResult]) -> None: + program_start: float, + args: argparse.Namespace, + inputs: list[AGCInput], + results: list[AGCResult], + ml_results: list[AGCResult], +) -> None: if args.inference: - ROOT.RDF.Experimental.Distributed.initialize(load_cpp) - if args.inference: - ROOT.RDF.Experimental.Distributed.initialize(ml.load_cpp) + + def ml_init(): + load_cpp() + ml.load_cpp() + + ROOT.RDF.Experimental.Distributed.initialize(ml_init) else: ROOT.RDF.Experimental.Distributed.initialize(load_cpp) - with create_dask_client(args.scheduler, args.ncores, args.hosts) as client: + scheduler_address = args.scheduler_address if args.scheduler_address else "" + with create_dask_client(args.scheduler, args.ncores, args.hosts, scheduler_address) as client: for input in inputs: df = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame( "Events", input.paths, daskclient=client, npartitions=args.npartitions @@ -358,7 +387,10 @@ def run_distributed( df._headnode.backend.distribute_unique_paths( [ "helpers.h", - "ml_helpers.cpp", + "ml_helpers.h", + "ml.py", + "models/bdt_even.root", + "models/bdt_odd.root", ] ) hist_list, ml_hist_list = book_histos( @@ -369,19 +401,15 @@ def run_distributed( for r in results + ml_results: if r.should_vary: - r.histo = ROOT.RDF.Experimental.Distributed.VariationsFor( - r.histo) + r.histo = ROOT.RDF.Experimental.Distributed.VariationsFor(r.histo) - print( - f"Building the computation graphs took {time() - program_start:.2f} seconds") + print(f"Building the computation graphs took {time() - program_start:.2f} seconds") # Run the event loops for all processes and variations here run_graphs_start = time() - ROOT.RDF.Experimental.Distributed.RunGraphs( - [r.nominal_histo for r in results + ml_results]) + ROOT.RDF.Experimental.Distributed.RunGraphs([r.nominal_histo for r in results + ml_results]) - print( - f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds") + print(f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds") def main() -> None: @@ -407,6 +435,14 @@ def main() -> None: if args.scheduler == "mt": run_mt(program_start, args, inputs, results, ml_results) else: + if args.scheduler == "dask-remote" and not args.scheduler_address: + raise ValueError( + "'dask-remote' option chosen but no address provided for the scheduler. Provide it with `--scheduler-address`." + ) + if args.scheduler_address and args.scheduler != "dask-remote": + raise ValueError( + f"An address of a Dask scheduler was provided but the chosen scheduler is '{args.scheduler}'. The 'dask-remote' scheduler must be chosen if an address is provided." + ) run_distributed(program_start, args, inputs, results, ml_results) results = postprocess_results(results) diff --git a/analyses/cms-open-data-ttbar/ml.py b/analyses/cms-open-data-ttbar/ml.py index b9dd6ef..bee9240 100644 --- a/analyses/cms-open-data-ttbar/ml.py +++ b/analyses/cms-open-data-ttbar/ml.py @@ -1,8 +1,11 @@ from dataclasses import dataclass +from pathlib import Path from typing import Tuple import ROOT +from distributed import get_worker + # histogram bin lower limit to use for each ML input feature bin_low = [0, 0, 0, 0, 50, 50, 50, 50, 25, 25, 25, 25, 0, 0, 0, 0, -1, -1, -1, -1] @@ -72,38 +75,41 @@ class MLHistoConf: for i in range(len(feature_names)) ] + def load_cpp(max_n_jets=6): # the default value of max_n_jets is the same as in the reference implementation # https://github.com/iris-hep/analysis-grand-challenge - ROOT.gSystem.CompileMacro("ml_helpers.cpp", "kO") - + try: + # when using distributed RDataFrame the header is copied to the local_directory + # of every worker (via `distribute_unique_paths`) + localdir = get_worker().local_directory + cpp_source = Path(localdir) / "ml_helpers.h" + model_even_path = Path(localdir) / "bdt_even.root" + model_odd_path = Path(localdir) / "bdt_odd.root" + except ValueError: + # must be local execution + cpp_source = "ml_helpers.h" + model_even_path = "models/bdt_even.root" + model_odd_path = "models/bdt_odd.root" + + ROOT.gInterpreter.Declare(f'#include "{cpp_source}"') # Initialize FastForest models. # Our BDT models have 20 input features according to the AGC documentation # https://agc.readthedocs.io/en/latest/taskbackground.html#machine-learning-component - ROOT.gInterpreter.Declare( - # **Conditional directives used to avoid redefinition error during distributed computing** - # Note: - # * moving all stuff in `Declare` to `ml_helpers.cpp` cancels the necessity of using `ifndef` - # * coming soon feature is `gInterpreter.Declare` with automatic header guards - # https://indico.fnal.gov/event/23628/contributions/240608/attachments/154873/201557/distributed_RDF_padulano_ROOT_workshop_2022.pdf - """ + ROOT.gInterpreter.ProcessLine( + f""" #ifndef AGC_MODELS #define AGC_MODELS - - TMVA::Experimental::RBDT feven("feven", "models/bdt_even.root"); - TMVA::Experimental::RBDT fodd("fodd", "models/bdt_odd.root"); - - """.__add__( - f""" - size_t max_n_jets = {max_n_jets}; - std::map> permutations = get_permutations_dict(max_n_jets); - + const static TMVA::Experimental::RBDT model_even{{"feven", "{model_even_path}"}}; + const static TMVA::Experimental::RBDT model_odd{{"fodd", "{model_odd_path}"}}; + const static std::size_t max_n_jets = {max_n_jets}; + const static auto permutations = get_permutations_dict(max_n_jets); #endif """ - ) ) + def define_features(df: ROOT.RDataFrame) -> ROOT.RDataFrame: return df.Define( "features", @@ -128,6 +134,7 @@ def define_features(df: ROOT.RDataFrame) -> ROOT.RDataFrame: """, ) + def predict_proba(df: ROOT.RDataFrame) -> ROOT.RDataFrame: """get probability scores for every permutation in event""" @@ -139,11 +146,12 @@ def predict_proba(df: ROOT.RDataFrame) -> ROOT.RDataFrame: "proba", """ bool is_even = (event % 2 == 0); - const auto& model = (is_even) ? fodd : feven; + const auto& model = (is_even) ? model_odd : model_even; return inference(features, model); """, ) + def infer_output_ml_features(df: ROOT.RDataFrame) -> ROOT.RDataFrame: """ Choose for each feature the best candidate with the highest probability score. diff --git a/analyses/cms-open-data-ttbar/ml_helpers.cpp b/analyses/cms-open-data-ttbar/ml_helpers.h similarity index 100% rename from analyses/cms-open-data-ttbar/ml_helpers.cpp rename to analyses/cms-open-data-ttbar/ml_helpers.h