From 432baa8a10f3a60ca758100500f5f264290ec7af Mon Sep 17 00:00:00 2001 From: Iason Krommydas Date: Tue, 22 Oct 2024 14:54:42 +0200 Subject: [PATCH 1/8] first draft --- scripts/run_analysis.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/scripts/run_analysis.py b/scripts/run_analysis.py index 5a6970c0..ec7c3267 100644 --- a/scripts/run_analysis.py +++ b/scripts/run_analysis.py @@ -63,6 +63,10 @@ def main(): with gzip.open("/tmp/preprocessed_fileset.json.gz", "wt") as f: logger.info("Saving the preprocessed fileset to /tmp/preprocessed_fileset.json.gz") json.dump(fileset, f, indent=2) + if args.executor == "dask/casa" or args.executor.startswith("tls://"): + # use xcache for coffea-casa + pass + instance = runner_utils.initialize_class(config, args, fileset) if args.port is not None: @@ -130,6 +134,18 @@ def main(): ], ) scheduler = "distributed" + elif args.executor == "dask/casa": + from coffea_casa import CoffeaCasaCluster + + logger.info("Running using CoffeaCasaCluster") + cluster = CoffeaCasaCluster( + cores=args.cores, + memory=args.memory, + disk=args.disk, + scheduler_options={"port": args.port, "dashboard_address": args.dashboard_address}, + log_directory=args.log_directory, + ) + scheduler = "distributed" elif args.executor == "dask/slurm": from dask_jobqueue import SLURMCluster @@ -180,6 +196,11 @@ def main(): cluster.scale(args.scaleout) logger.info(f"Set up cluster {cluster}") client = Client(cluster) + if args.executor == "dask/casa": + from dask.distributed import PipInstall + + plugin = PipInstall(packages=["https://github.com/ikrommyd/egamma-tnp.git@master"]) + client.register_worker_plugin(plugin) logger.info(f"Set up client {client}") if args.executor is not None and (args.executor.startswith("tls://") or args.executor.startswith("tcp://") or args.executor.startswith("ucx://")): client = Client(args.executor) @@ -208,6 +229,11 @@ def main(): out = runner_utils.process_out(out, args.output) logger.info(f"Final output after post-processing:\n{out}") logger.info("Finished the E/Gamma Tag and Probe workflow") + logger.info("Shutting down the client and cluster if any") + if client: + client.shutdown() + if cluster: + cluster.close() if __name__ == "__main__": From 2af462d666262876ab10b457a2f97ee563960bf6 Mon Sep 17 00:00:00 2001 From: Iason Krommydas Date: Wed, 23 Oct 2024 00:51:51 +0200 Subject: [PATCH 2/8] use xcache for files --- scripts/run_analysis.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/scripts/run_analysis.py b/scripts/run_analysis.py index ec7c3267..9323333b 100644 --- a/scripts/run_analysis.py +++ b/scripts/run_analysis.py @@ -63,9 +63,18 @@ def main(): with gzip.open("/tmp/preprocessed_fileset.json.gz", "wt") as f: logger.info("Saving the preprocessed fileset to /tmp/preprocessed_fileset.json.gz") json.dump(fileset, f, indent=2) + if args.executor == "dask/casa" or args.executor.startswith("tls://"): # use xcache for coffea-casa - pass + xrootd_pfx = "root://" + xrd_pfx_len = len(xrootd_pfx) + for dataset in fileset.keys(): + files = fileset[dataset]["files"] + newfiles = {} + for path, value in files.items(): + newpath = path.replace(path[xrd_pfx_len : xrd_pfx_len + path[xrd_pfx_len:].find("/store")], "xcache/") + newfiles[newpath] = value + fileset[dataset]["files"] = newfiles instance = runner_utils.initialize_class(config, args, fileset) From fe93cdd80e74fe35a1e49e7c09d4f5a5756194d6 Mon Sep 17 00:00:00 2001 From: Iason Krommydas Date: Wed, 23 Oct 2024 00:52:57 +0200 Subject: [PATCH 3/8] do it before preprocessing --- scripts/run_analysis.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/scripts/run_analysis.py b/scripts/run_analysis.py index 9323333b..d8f04a25 100644 --- a/scripts/run_analysis.py +++ b/scripts/run_analysis.py @@ -51,18 +51,6 @@ def main(): runner_utils.set_binning(runner_utils.load_json(args.binning)) fileset = runner_utils.load_json(args.fileset) logger.info(f"Loaded fileset from {args.fileset}") - if args.preprocess: - from coffea.dataset_tools import preprocess - - client = Client(dashboard_address=args.dashboard_address) - logger.info(f"Preprocessing the fileset with client: {client}") - fileset = preprocess(fileset, step_size=100_000, skip_bad_files=True, scheduler=None)[0] - logger.info("Done preprocessing the fileset") - client.shutdown() - - with gzip.open("/tmp/preprocessed_fileset.json.gz", "wt") as f: - logger.info("Saving the preprocessed fileset to /tmp/preprocessed_fileset.json.gz") - json.dump(fileset, f, indent=2) if args.executor == "dask/casa" or args.executor.startswith("tls://"): # use xcache for coffea-casa @@ -76,6 +64,19 @@ def main(): newfiles[newpath] = value fileset[dataset]["files"] = newfiles + if args.preprocess: + from coffea.dataset_tools import preprocess + + client = Client(dashboard_address=args.dashboard_address) + logger.info(f"Preprocessing the fileset with client: {client}") + fileset = preprocess(fileset, step_size=100_000, skip_bad_files=True, scheduler=None)[0] + logger.info("Done preprocessing the fileset") + client.shutdown() + + with gzip.open("/tmp/preprocessed_fileset.json.gz", "wt") as f: + logger.info("Saving the preprocessed fileset to /tmp/preprocessed_fileset.json.gz") + json.dump(fileset, f, indent=2) + instance = runner_utils.initialize_class(config, args, fileset) if args.port is not None: From ed0da584637e7c756b1814f45286a3768f51c929 Mon Sep 17 00:00:00 2001 From: Iason Krommydas Date: Wed, 23 Oct 2024 02:33:03 +0200 Subject: [PATCH 4/8] also install worker plugin if tls executor --- scripts/run_analysis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/run_analysis.py b/scripts/run_analysis.py index d8f04a25..ce3f3c59 100644 --- a/scripts/run_analysis.py +++ b/scripts/run_analysis.py @@ -206,7 +206,7 @@ def main(): cluster.scale(args.scaleout) logger.info(f"Set up cluster {cluster}") client = Client(cluster) - if args.executor == "dask/casa": + if args.executor == "dask/casa" or args.executor.startswith("tls://"): from dask.distributed import PipInstall plugin = PipInstall(packages=["https://github.com/ikrommyd/egamma-tnp.git@master"]) From a8f3694a5b5a07464253d88d8f8632326c9a2541 Mon Sep 17 00:00:00 2001 From: Iason Krommydas Date: Wed, 23 Oct 2024 02:47:25 +0200 Subject: [PATCH 5/8] sneak in a little improvement in here --- src/egamma_tnp/utils/runner_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/egamma_tnp/utils/runner_utils.py b/src/egamma_tnp/utils/runner_utils.py index e06cb8e6..79b5c92c 100644 --- a/src/egamma_tnp/utils/runner_utils.py +++ b/src/egamma_tnp/utils/runner_utils.py @@ -188,7 +188,7 @@ def save_array_to_parquet(array, output_dir, dataset, subdir, prefix=None, repar # Trick to reduce node multiplicity before to_parquet until it's fixed. # TODO: remove this when the issue is fixed - array = array[array.run > -999.0] + array = array[0:] # Repartition the array if needed if repartition_n: From bb897a46cdf3a0bec9e57d4c77e4e87b6b724934 Mon Sep 17 00:00:00 2001 From: Iason Krommydas Date: Wed, 23 Oct 2024 10:22:52 +0200 Subject: [PATCH 6/8] follow dask recommendatons --- scripts/run_analysis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/run_analysis.py b/scripts/run_analysis.py index ce3f3c59..0e2c7766 100644 --- a/scripts/run_analysis.py +++ b/scripts/run_analysis.py @@ -209,8 +209,8 @@ def main(): if args.executor == "dask/casa" or args.executor.startswith("tls://"): from dask.distributed import PipInstall - plugin = PipInstall(packages=["https://github.com/ikrommyd/egamma-tnp.git@master"]) - client.register_worker_plugin(plugin) + plugin = PipInstall(packages=["egamma-tnp@git+https://${TOKEN}@github.com/ikrommyd/egamma-tnp.git@master"]) + client.register_plugin(plugin) logger.info(f"Set up client {client}") if args.executor is not None and (args.executor.startswith("tls://") or args.executor.startswith("tcp://") or args.executor.startswith("ucx://")): client = Client(args.executor) From d3375fbbba6ba49550e9db59ff02e1423669d856 Mon Sep 17 00:00:00 2001 From: Iason Krommydas Date: Wed, 23 Oct 2024 10:56:42 +0200 Subject: [PATCH 7/8] typo --- scripts/run_analysis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/run_analysis.py b/scripts/run_analysis.py index 0e2c7766..1c816c2b 100644 --- a/scripts/run_analysis.py +++ b/scripts/run_analysis.py @@ -191,7 +191,7 @@ def main(): scheduler_options={"dashboard_address": args.dashboard_address}, ) scheduler = "distributed" - elif args.executor is not None and (args.executor.startswith("tls:://") or args.executor.startswith("tcp://") or args.executor.startswith("ucx://")): + elif args.executor is not None and (args.executor.startswith("tls://") or args.executor.startswith("tcp://") or args.executor.startswith("ucx://")): logger.info(f"Will use dask scheduler at {args.executor}") elif args.executor is None: logger.info("Running with default dask scheduler") From c420adba2ed682ee53169e3251a638c3bd492171 Mon Sep 17 00:00:00 2001 From: Iason Krommydas Date: Wed, 23 Oct 2024 10:59:54 +0200 Subject: [PATCH 8/8] one more fix --- scripts/run_analysis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/run_analysis.py b/scripts/run_analysis.py index 1c816c2b..0ccd4721 100644 --- a/scripts/run_analysis.py +++ b/scripts/run_analysis.py @@ -79,7 +79,7 @@ def main(): instance = runner_utils.initialize_class(config, args, fileset) - if args.port is not None: + if args.port is not None and (not args.executor.startswith("tls://") and not args.executor.startswith("tcp://") and not args.executor.startswith("ucx://")): if not runner_utils.check_port(args.port): logger.error(f"Port {args.port} is occupied in this node. Try another one.") raise ValueError(f"Port {args.port} is occupied in this node. Try another one.")