Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to improve experience on SWAN #51

Merged
merged 6 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 74 additions & 38 deletions analyses/cms-open-data-ttbar/analysis.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import argparse
import multiprocessing
from pathlib import Path
from time import time
from typing import Optional, Tuple
from typing import Tuple

from distributed import Client, get_worker, LocalCluster, SSHCluster
import ml
from plotting import save_ml_plots, save_plots
import ROOT
from utils import (
AGCInput,
AGCResult,
postprocess_results,
retrieve_inputs,
save_histos,
)

# Using https://atlas-groupdata.web.cern.ch/atlas-groupdata/dev/AnalysisTop/TopDataPreparation/XSection-MC15-13TeV.data

Check failure on line 19 in analyses/cms-open-data-ttbar/analysis.py

View workflow job for this annotation

GitHub Actions / linter

Ruff (I001)

analyses/cms-open-data-ttbar/analysis.py:1:1: I001 Import block is un-sorted or un-formatted
# as a reference. Values are in pb.
XSEC_INFO = {
"ttbar": 396.87 + 332.97, # nonallhad + allhad, keep same x-sec for all
Expand Down Expand Up @@ -66,15 +66,19 @@
The default is `mt`, i.e. multi-thread execution.
If dask-ssh, a list of worker node hostnames to connect to should be provided via the --nodes option.""",
default="mt",
choices=["mt", "dask-local", "dask-ssh"],
choices=["mt", "dask-local", "dask-ssh", "dask-remote"],
)
p.add_argument(
"--scheduler-address",
help="Full address of the Dask scheduler, passed as argument to the distributed.Client object. If this argument is provided, the 'dask-remote' scheduler must be chosen, and it is a required argument in such case.",
)
p.add_argument(
"--ncores",
"-c",
help=(
"Number of cores to use. In case of distributed execution this is the amount of cores per node."
),
default = multiprocessing.cpu_count(),
default=multiprocessing.cpu_count(),
type=int,
)
p.add_argument(
Expand All @@ -91,7 +95,7 @@
return p.parse_args()


def create_dask_client(scheduler: str, ncores: int, hosts: str) -> Client:
def create_dask_client(scheduler: str, ncores: int, hosts: str, scheduler_address: str) -> Client:
"""Create a Dask distributed client."""
if scheduler == "dask-local":
lc = LocalCluster(n_workers=ncores, threads_per_worker=1, processes=True)
Expand All @@ -109,10 +113,14 @@
)
return Client(sshc)

if scheduler == "dask-remote":
return Client(scheduler_address)

raise ValueError(
f"Unexpected scheduling mode '{scheduler}'. Valid modes are ['dask-local', 'dask-ssh']."
f"Unexpected scheduling mode '{scheduler}'. Valid modes are ['dask-local', 'dask-ssh', 'dask-remote']."
)


def define_trijet_mass(df: ROOT.RDataFrame) -> ROOT.RDataFrame:
"""Add the trijet_mass observable to the dataframe after applying the appropriate selections."""

Expand Down Expand Up @@ -248,9 +256,17 @@
nominal_histo = df.Histo1D(histo_model, observable, "Weights")

if variation == "nominal":
results.append(AGCResult(nominal_histo, region, process, variation, nominal_histo, should_vary=True))
results.append(
AGCResult(
nominal_histo, region, process, variation, nominal_histo, should_vary=True
)
)
else:
results.append(AGCResult(nominal_histo, region, process, variation, nominal_histo, should_vary=False))
results.append(
AGCResult(
nominal_histo, region, process, variation, nominal_histo, should_vary=False
)
)
print(f"Booked histogram {histo_model.fName}")

ml_results: list[AGCResult] = []
Expand All @@ -275,11 +291,20 @@

if variation == "nominal":
ml_results.append(
AGCResult(nominal_histo, feature.name, process, variation, nominal_histo, should_vary=True)
AGCResult(
nominal_histo, feature.name, process, variation, nominal_histo, should_vary=True
)
)
else:
ml_results.append(
AGCResult(nominal_histo, feature.name, process, variation, nominal_histo, should_vary=False)
AGCResult(
nominal_histo,
feature.name,
process,
variation,
nominal_histo,
should_vary=False,
)
)
print(f"Booked histogram {histo_model.fName}")

Expand All @@ -299,20 +324,21 @@
# must be local execution
cpp_source = "helpers.h"

ROOT.gSystem.CompileMacro(str(cpp_source), "kO")
ROOT.gInterpreter.Declare(f'#include "{str(cpp_source)}"')


def run_mt(
program_start: float,
args: argparse.Namespace,
inputs: list[AGCInput],
results: list[AGCResult],
ml_results: list[AGCResult]) -> None:
program_start: float,
args: argparse.Namespace,
inputs: list[AGCInput],
results: list[AGCResult],
ml_results: list[AGCResult],
) -> None:
ROOT.EnableImplicitMT(args.ncores)
print(f"Number of threads: {ROOT.GetThreadPoolSize()}")
load_cpp()
if args.inference:
ml.load_cpp()
ml.load_cpp()

for input in inputs:
df = ROOT.RDataFrame("Events", input.paths)
Expand All @@ -326,39 +352,45 @@
if r.should_vary:
r.histo = ROOT.RDF.Experimental.VariationsFor(r.histo)

print(
f"Building the computation graphs took {time() - program_start:.2f} seconds")
print(f"Building the computation graphs took {time() - program_start:.2f} seconds")

# Run the event loops for all processes and variations here
run_graphs_start = time()
ROOT.RDF.RunGraphs([r.nominal_histo for r in results + ml_results])

print(
f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds")
print(f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds")


def run_distributed(
program_start: float,
args: argparse.Namespace,
inputs: list[AGCInput],
results: list[AGCResult],
ml_results: list[AGCResult]) -> None:
program_start: float,
args: argparse.Namespace,
inputs: list[AGCInput],
results: list[AGCResult],
ml_results: list[AGCResult],
) -> None:
if args.inference:
ROOT.RDF.Experimental.Distributed.initialize(load_cpp)
if args.inference:
ROOT.RDF.Experimental.Distributed.initialize(ml.load_cpp)

def ml_init():
load_cpp()
ml.load_cpp()

ROOT.RDF.Experimental.Distributed.initialize(ml_init)
else:
ROOT.RDF.Experimental.Distributed.initialize(load_cpp)

with create_dask_client(args.scheduler, args.ncores, args.hosts) as client:
scheduler_address = args.scheduler_address if args.scheduler_address else ""
with create_dask_client(args.scheduler, args.ncores, args.hosts, scheduler_address) as client:
for input in inputs:
df = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame(
"Events", input.paths, daskclient=client, npartitions=args.npartitions
)
df._headnode.backend.distribute_unique_paths(
[
"helpers.h",
"ml_helpers.cpp",
"ml_helpers.h",
"ml.py",
"models/bdt_even.root",
"models/bdt_odd.root",
]
)
hist_list, ml_hist_list = book_histos(
Expand All @@ -369,19 +401,15 @@

for r in results + ml_results:
if r.should_vary:
r.histo = ROOT.RDF.Experimental.Distributed.VariationsFor(
r.histo)
r.histo = ROOT.RDF.Experimental.Distributed.VariationsFor(r.histo)

print(
f"Building the computation graphs took {time() - program_start:.2f} seconds")
print(f"Building the computation graphs took {time() - program_start:.2f} seconds")

# Run the event loops for all processes and variations here
run_graphs_start = time()
ROOT.RDF.Experimental.Distributed.RunGraphs(
[r.nominal_histo for r in results + ml_results])
ROOT.RDF.Experimental.Distributed.RunGraphs([r.nominal_histo for r in results + ml_results])

print(
f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds")
print(f"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds")


def main() -> None:
Expand All @@ -407,6 +435,14 @@
if args.scheduler == "mt":
run_mt(program_start, args, inputs, results, ml_results)
else:
if args.scheduler == "dask-remote" and not args.scheduler_address:
raise ValueError(
"'dask-remote' option chosen but no address provided for the scheduler. Provide it with `--scheduler-address`."
)
if args.scheduler_address and args.scheduler != "dask-remote":
raise ValueError(
f"An address of a Dask scheduler was provided but the chosen scheduler is '{args.scheduler}'. The 'dask-remote' scheduler must be chosen if an address is provided."
)
run_distributed(program_start, args, inputs, results, ml_results)

results = postprocess_results(results)
Expand Down
48 changes: 28 additions & 20 deletions analyses/cms-open-data-ttbar/ml.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from dataclasses import dataclass
from pathlib import Path
from typing import Tuple

import ROOT

from distributed import get_worker

# histogram bin lower limit to use for each ML input feature

Check failure on line 9 in analyses/cms-open-data-ttbar/ml.py

View workflow job for this annotation

GitHub Actions / linter

Ruff (I001)

analyses/cms-open-data-ttbar/ml.py:1:1: I001 Import block is un-sorted or un-formatted
bin_low = [0, 0, 0, 0, 50, 50, 50, 50, 25, 25, 25, 25, 0, 0, 0, 0, -1, -1, -1, -1]

# histogram bin upper limit to use for each ML input feature
Expand Down Expand Up @@ -72,38 +75,41 @@
for i in range(len(feature_names))
]


def load_cpp(max_n_jets=6):
# the default value of max_n_jets is the same as in the reference implementation
# https://github.com/iris-hep/analysis-grand-challenge
ROOT.gSystem.CompileMacro("ml_helpers.cpp", "kO")

try:
# when using distributed RDataFrame the header is copied to the local_directory
# of every worker (via `distribute_unique_paths`)
localdir = get_worker().local_directory
cpp_source = Path(localdir) / "ml_helpers.h"
model_even_path = Path(localdir) / "bdt_even.root"
model_odd_path = Path(localdir) / "bdt_odd.root"
except ValueError:
# must be local execution
cpp_source = "ml_helpers.h"
model_even_path = "models/bdt_even.root"
model_odd_path = "models/bdt_odd.root"

ROOT.gInterpreter.Declare(f'#include "{cpp_source}"')
# Initialize FastForest models.
# Our BDT models have 20 input features according to the AGC documentation
# https://agc.readthedocs.io/en/latest/taskbackground.html#machine-learning-component

ROOT.gInterpreter.Declare(
# **Conditional directives used to avoid redefinition error during distributed computing**
# Note:
# * moving all stuff in `Declare` to `ml_helpers.cpp` cancels the necessity of using `ifndef`
# * coming soon feature is `gInterpreter.Declare` with automatic header guards
# https://indico.fnal.gov/event/23628/contributions/240608/attachments/154873/201557/distributed_RDF_padulano_ROOT_workshop_2022.pdf
"""
ROOT.gInterpreter.ProcessLine(
f"""
#ifndef AGC_MODELS
#define AGC_MODELS

TMVA::Experimental::RBDT feven("feven", "models/bdt_even.root");
TMVA::Experimental::RBDT fodd("fodd", "models/bdt_odd.root");

""".__add__(
f"""
size_t max_n_jets = {max_n_jets};
std::map<int, std::vector<ROOT::RVecI>> permutations = get_permutations_dict(max_n_jets);

const static TMVA::Experimental::RBDT model_even{{"feven", "{model_even_path}"}};
const static TMVA::Experimental::RBDT model_odd{{"fodd", "{model_odd_path}"}};
const static std::size_t max_n_jets = {max_n_jets};
const static auto permutations = get_permutations_dict(max_n_jets);
#endif
"""
)
)


def define_features(df: ROOT.RDataFrame) -> ROOT.RDataFrame:
return df.Define(
"features",
Expand All @@ -128,6 +134,7 @@
""",
)


def predict_proba(df: ROOT.RDataFrame) -> ROOT.RDataFrame:
"""get probability scores for every permutation in event"""

Expand All @@ -139,11 +146,12 @@
"proba",
"""
bool is_even = (event % 2 == 0);
const auto& model = (is_even) ? fodd : feven;
const auto& model = (is_even) ? model_odd : model_even;
return inference(features, model);
""",
)


def infer_output_ml_features(df: ROOT.RDataFrame) -> ROOT.RDataFrame:
"""
Choose for each feature the best candidate with the highest probability score.
Expand Down
Loading