From 607c30eca97700d1ef22e92f3483ce739a97250e Mon Sep 17 00:00:00 2001 From: Dmitry Kalinkin Date: Thu, 19 Dec 2024 03:12:33 -0500 Subject: [PATCH] backwards_ecal: run with dask --- benchmarks/backwards_ecal/Snakefile | 18 ++++ benchmarks/backwards_ecal/backwards_ecal.org | 88 ++++++++++++++------ benchmarks/backwards_ecal/config.yml | 4 +- benchmarks/backwards_ecal/requirements.txt | 5 +- 4 files changed, 85 insertions(+), 30 deletions(-) diff --git a/benchmarks/backwards_ecal/Snakefile b/benchmarks/backwards_ecal/Snakefile index f14fd979..e81671b0 100644 --- a/benchmarks/backwards_ecal/Snakefile +++ b/benchmarks/backwards_ecal/Snakefile @@ -152,6 +152,10 @@ rule backwards_ecal: script="benchmarks/backwards_ecal/backwards_ecal.py", output: directory("results/backwards_ecal/{CAMPAIGN}/") + log: + scheduler=".logs/results/backwards_ecal/{CAMPAIGN}/scheduler.log", + worker=".logs/results/backwards_ecal/{CAMPAIGN}/worker.log", + threads: workflow.cores shell: """ if [[ "{wildcards.CAMPAIGN}" == "local" ]]; then @@ -159,6 +163,20 @@ export PLOT_TITLE="Benchmark simulation" else export PLOT_TITLE="\\textbf{{ePIC}} Simulation {wildcards.CAMPAIGN}" fi + +set -m # monitor mode to prevent lingering processes +cleanup() {{ + echo Cleaning up + kill $WORKER_PID $SCHEDULER_PID +}} +trap cleanup EXIT + +PORT=$RANDOM +dask scheduler --port $PORT 2>{log.scheduler} & +export DASK_SCHEDULER=localhost:$PORT +SCHEDULER_PID=$! +dask worker tcp://$DASK_SCHEDULER --nworkers {threads} --nthreads 1 2>{log.worker} & +WORKER_PID=$! env \ MATPLOTLIBRC={input.matplotlibrc} \ DETECTOR_CONFIG=""" + DETECTOR_CONFIG + """ \ diff --git a/benchmarks/backwards_ecal/backwards_ecal.org b/benchmarks/backwards_ecal/backwards_ecal.org index 6ca1e916..f69d44f0 100644 --- a/benchmarks/backwards_ecal/backwards_ecal.org +++ b/benchmarks/backwards_ecal/backwards_ecal.org @@ -18,12 +18,32 @@ import os from pathlib import Path import awkward as ak +import boost_histogram as bh +import dask_histogram as dh import numpy as np -import vector import uproot -from sklearn.metrics import roc_curve +import vector +#+end_src + +#+begin_src jupyter-python :results silent +from dask.distributed import Client +client = Client(os.environ.get("DASK_SCHEDULER", "localhost:8786")) +#+end_src + +#+begin_src jupyter-python vector.register_awkward() + +from dask.distributed import WorkerPlugin +class VectorImporter(WorkerPlugin): + idempotent=True + + def setup(self, worker): + import vector + + vector.register_awkward() + +client.register_plugin(VectorImporter()) #+end_src * Plotting setup @@ -93,7 +113,7 @@ energies = [ ] filter_name = [ "MCParticles.*", - "*EcalEndcapNClusters*", + "EcalEndcapNClusters.energy", ] pi_eval = {} @@ -105,14 +125,18 @@ def readlist(path): return paths for energy in energies: - pi_eval[energy] = filter_pointing(uproot.concatenate( + pi_eval[energy] = uproot.dask( {path: "events" for path in readlist(INPUT_PATH_FORMAT.format(particle="pi-", energy=energy))}, filter_name=filter_name, - )) - e_eval[energy] = filter_pointing(uproot.concatenate( + steps_per_file=1, + open_files=False, + ).map_partitions(filter_pointing) + e_eval[energy] = uproot.dask( {path: "events" for path in readlist(INPUT_PATH_FORMAT.format(particle="e-", energy=energy))}, filter_name=filter_name, - )) + steps_per_file=1, + open_files=False, + ).map_partitions(filter_pointing) #+end_src ** Energy resolution @@ -125,29 +149,38 @@ axs = np.ravel(np.array(axs)) sigmas_rel_FWHM_cb = {} fractions_below = {} +e_over_p_hist = {} +e_over_p_axis = bh.axis.Regular(101, 0., 1.01) + for ix, energy in enumerate(energies): - energy_value = float(energy.replace("GeV", "").replace("MeV", "e-3")) - clf_label = PLOT_TITLE - def clf(events): - return ak.drop_none(ak.max(events["EcalEndcapNClusters.energy"], axis=-1)) / energy_value - e_pred = clf(e_eval[energy]) + for particle_name, dataset in [("e-", e_eval[energy]), ("pi-", pi_eval[energy])]: + energy_value = float(energy.replace("GeV", "").replace("MeV", "e-3")) + def clf(events): + return ak.drop_none(ak.max(events["EcalEndcapNClusters.energy"], axis=-1)) / energy_value + e_over_p_hist[(particle_name, energy)] = dh.factory( + clf(dataset), + axes=(e_over_p_axis,), + ) +e_over_p_hist = client.gather(client.compute(e_over_p_hist)) + +for ix, energy in enumerate(energies): plt.sca(axs[ix]) - counts, bins, patches = plt.hist(e_pred, weights=np.full_like(e_pred, 1.0 / ak.num(e_pred, axis=0)), bins=np.linspace(0.01, 1.01, 101), label=rf"$e^-$ {clf_label}") + hist = e_over_p_hist[("e-", energy)] + patch = plt.stairs(hist.values() / np.sum(hist.values()), hist.axes[0].edges, label=rf"$e^-$ {PLOT_TITLE}") plt.title(f"{energy}") - e_over_p = (bins[1:] + bins[:-1]) / 2 import scipy.stats def f(x, n, beta, m, loc, scale): return n * scipy.stats.crystalball.pdf(x, beta, m, loc, scale) - p0 = (np.sum(counts[10:]), 2., 3., 0.95, 0.05) + p0 = (np.sum(hist.values()[10:]), 2., 3., 0.95, 0.05) try: import scipy.optimize - par, pcov = scipy.optimize.curve_fit(f, e_over_p[5:], counts[5:], p0=p0, maxfev=10000) + par, pcov = scipy.optimize.curve_fit(f, hist.axes[0].centers[5:], hist.values()[5:], p0=p0, maxfev=10000) except RuntimeError: par = None - plt.plot(e_over_p, f(e_over_p, *par), label=rf"Crystal Ball fit", color="tab:green", lw=0.8) + plt.plot(hist.axes[0].centers, f(hist.axes[0].centers, *par), label=rf"Crystal Ball fit", color="tab:green", lw=0.8) def summarize_fit(par): _, _, _, loc_cb, scale_cb = par @@ -156,19 +189,19 @@ for ix, energy in enumerate(energies): f_prime = lambda x: f(x, *par) - y_max / 2 x_plus, = scipy.optimize.root(f_prime, loc_cb + scale_cb).x x_minus, = scipy.optimize.root(f_prime, loc_cb - scale_cb).x - plt.axvline(x_minus, ls="--", lw=0.75, color=patches[0].get_facecolor(), label=r"$\mu - $FWHM") - plt.axvline(x_plus, ls=":", lw=0.75, color=patches[0].get_facecolor(), label=r"$\mu + $FWHM") + plt.axvline(x_minus, ls="--", lw=0.75, color=patch.get_facecolor(), label=r"$\mu - $FWHM") + plt.axvline(x_plus, ls=":", lw=0.75, color=patch.get_facecolor(), label=r"$\mu + $FWHM") fwhm = (x_plus - x_minus) / loc_cb sigma_rel_FWHM_cb = fwhm / 2 / np.sqrt(2 * np.log(2)) cutoff_x = loc_cb - fwhm - fraction_below = np.sum(counts[e_over_p < cutoff_x]) / ak.num(e_pred, axis=0) + fraction_below = np.sum(hist.values()[hist.axes[0].centers < cutoff_x]) / np.sum(hist.values()) return sigma_rel_FWHM_cb, fraction_below sigma_rel_FWHM_cb, fraction_below = summarize_fit(par) - sigmas_rel_FWHM_cb.setdefault(clf_label, {})[energy] = sigma_rel_FWHM_cb - fractions_below.setdefault(clf_label, {})[energy] = fraction_below + sigmas_rel_FWHM_cb.setdefault(PLOT_TITLE, {})[energy] = sigma_rel_FWHM_cb + fractions_below.setdefault(PLOT_TITLE, {})[energy] = fraction_below plt.legend() plt.xlabel("$E/p$", loc="right") @@ -234,6 +267,7 @@ plt.show() ** Pion rejection #+begin_src jupyter-python +""" fig, axs = plt.subplots(2, 4, sharex=True, sharey=True, figsize=(15, 6)) fig_log, axs_log = plt.subplots(2, 4, sharex=True, sharey=True, figsize=(15, 6)) fig_roc, axs_roc = plt.subplots(2, 4, sharex=True, sharey=True, figsize=(15, 6)) @@ -246,7 +280,6 @@ rocs = {} for ix, energy in enumerate(energies): energy_value = float(energy.replace("GeV", "").replace("MeV", "e-3")) - clf_label = PLOT_TITLE def clf(events): return ak.drop_none(ak.max(events["EcalEndcapNClusters.energy"], axis=-1)) / energy_value e_pred = clf(e_eval[energy]) @@ -254,8 +287,8 @@ for ix, energy in enumerate(energies): for do_log, ax in [(False, axs[ix]), (True, axs_log[ix])]: plt.sca(ax) - plt.hist(e_pred, weights=np.full_like(e_pred, 1.0 / ak.num(e_pred, axis=0)), bins=np.linspace(0., 1.01, 101), label=rf"$e^-$ {clf_label}") - plt.hist(pi_pred, weights=np.full_like(pi_pred, 1.0 / ak.num(pi_pred, axis=0)), bins=np.linspace(0., 1.01, 101), label=rf"$\pi^-$ {clf_label}", histtype="step") + plt.hist(e_pred, weights=np.full_like(e_pred, 1.0 / ak.num(e_pred, axis=0)), bins=np.linspace(0., 1.01, 101), label=rf"$e^-$ {PLOT_TILE}") + plt.hist(pi_pred, weights=np.full_like(pi_pred, 1.0 / ak.num(pi_pred, axis=0)), bins=np.linspace(0., 1.01, 101), label=rf"$\pi^-$ {PLOT_TITLE}", histtype="step") plt.title(f"{energy}") plt.legend() plt.xlabel("Classifier output") @@ -274,8 +307,8 @@ for ix, energy in enumerate(energies): def interp(eff): return np.interp(eff, tpr, fpr) return interp - rocs.setdefault(clf_label, {})[energy] = mk_interp(tpr, fpr) - plt.plot(tpr[cond] * 100, 1 / fpr[cond], label=f"{clf_label}") + rocs.setdefault(PLOT_TITLE, {})[energy] = mk_interp(tpr, fpr) + plt.plot(tpr[cond] * 100, 1 / fpr[cond], label=f"{PLOT_TILE}") plt.yscale("log") plt.title(f"{energy}") plt.legend(loc="lower left") @@ -309,4 +342,5 @@ plt.ylabel("Pion rejection at 95%") plt.savefig(output_dir / f"pion_rej.pdf", bbox_inches="tight") plt.savefig(output_dir / f"pion_rej.png", bbox_inches="tight") plt.show() +""" #+end_src diff --git a/benchmarks/backwards_ecal/config.yml b/benchmarks/backwards_ecal/config.yml index 68a183e5..a4e19bd7 100644 --- a/benchmarks/backwards_ecal/config.yml +++ b/benchmarks/backwards_ecal/config.yml @@ -26,7 +26,7 @@ bench:backwards_ecal: script: - export PYTHONUSERBASE=$LOCAL_DATA_PATH/deps - pip install -r benchmarks/backwards_ecal/requirements.txt - - snakemake $SNAKEMAKE_FLAGS --cores 1 results/backwards_ecal/local + - snakemake $SNAKEMAKE_FLAGS --cores 5 results/backwards_ecal/local bench:backwards_ecal_campaigns: extends: .det_benchmark @@ -36,7 +36,7 @@ bench:backwards_ecal_campaigns: script: - export PYTHONUSERBASE=$LOCAL_DATA_PATH/deps - pip install -r benchmarks/backwards_ecal/requirements.txt - - snakemake $SNAKEMAKE_FLAGS --cores 1 results/backwards_ecal/24.10.1 + - snakemake $SNAKEMAKE_FLAGS --cores 5 results/backwards_ecal/24.10.1 collect_results:backwards_ecal: extends: .det_benchmark diff --git a/benchmarks/backwards_ecal/requirements.txt b/benchmarks/backwards_ecal/requirements.txt index 3d0f6336..33e6f29a 100644 --- a/benchmarks/backwards_ecal/requirements.txt +++ b/benchmarks/backwards_ecal/requirements.txt @@ -1,4 +1,7 @@ awkward >= 2.4.0 -scikit-learn +boost_histogram +dask >= 2024 +distributed >= 2024 +dask_histogram uproot >= 5.2.0 vector