From efa0751fdb63a5d2a37f8b395d5f8abc7b2ca311 Mon Sep 17 00:00:00 2001 From: Marin Manuel Date: Sat, 29 Jun 2024 11:22:19 -0400 Subject: [PATCH 01/23] added option to pass extra arguments to `sbatch` when using `run_sorter_jobs` --- src/spikeinterface/sorters/launcher.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index c7127226b0..9315b76ac7 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -26,14 +26,14 @@ joblib=dict(n_jobs=-1, backend="loky"), processpoolexecutor=dict(max_workers=2, mp_context=None), dask=dict(client=None), - slurm=dict(tmp_script_folder=None, cpus_per_task=1, mem="1G"), + slurm=dict(tmp_script_folder=None, sbatch_args=dict(cpus_per_task=1, mem="1G")), ) _implemented_engine = list(_default_engine_kwargs.keys()) -def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=False): +def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=False): """ Run several :py:func:`run_sorter()` sequentially or in parallel given a list of jobs. @@ -55,11 +55,10 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal Where *blocking* means that this function is blocking until the results are returned. This is in opposition to *asynchronous*, where the function returns `None` almost immediately (aka non-blocking), - but the results must be retrieved by hand when jobs are finished. No mechanisim is provided here to be know - when jobs are finish. + but the results must be retrieved by hand when jobs are finished. No mechanism is provided here to know + when jobs are finished. In this *asynchronous* case, the :py:func:`~spikeinterface.sorters.read_sorter_folder()` helps to retrieve individual results. - Parameters ---------- job_list : list of dict @@ -68,10 +67,12 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal The engine to run the list. * "loop" : a simple loop. This engine is engine_kwargs : dict + In the case of engine="slum", arguments to sbatch can be passed via sbatch_args, which is a dictionary whose + keys correspond to the --args to be passed to sbatch. return_output : bool, dfault False Return a sortings or None. - This also overwrite kwargs in in run_sorter(with_sorting=True/False) + This also overwrite kwargs in run_sorter(with_sorting=True/False) Returns ------- @@ -81,6 +82,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal assert engine in _implemented_engine, f"engine must be in {_implemented_engine}" + engine_kwargs = {} if None else engine_kwargs engine_kwargs_ = dict() engine_kwargs_.update(_default_engine_kwargs[engine]) engine_kwargs_.update(engine_kwargs) @@ -180,7 +182,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal ) f.write(slurm_script) os.fchmod(f.fileno(), mode=stat.S_IRWXU) - + sbatch_args = ' '.join(['--{k}={v}' for k, v in sbatch_args.items()] subprocess.Popen(["sbatch", str(script_name.absolute()), f"-cpus-per-task={cpus_per_task}", f"-mem={mem}"]) return out From 4bdf244705bc8a5d8792c7c50bd8eacf0259050e Mon Sep 17 00:00:00 2001 From: Marin Manuel Date: Sat, 29 Jun 2024 11:22:19 -0400 Subject: [PATCH 02/23] added option to pass extra arguments to `sbatch` when using `run_sorter_jobs` --- src/spikeinterface/sorters/launcher.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index c7127226b0..53cf0aed71 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -26,14 +26,14 @@ joblib=dict(n_jobs=-1, backend="loky"), processpoolexecutor=dict(max_workers=2, mp_context=None), dask=dict(client=None), - slurm=dict(tmp_script_folder=None, cpus_per_task=1, mem="1G"), + slurm=dict(tmp_script_folder=None, sbatch_args=dict(cpus_per_task=1, mem="1G")), ) _implemented_engine = list(_default_engine_kwargs.keys()) -def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=False): +def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=False): """ Run several :py:func:`run_sorter()` sequentially or in parallel given a list of jobs. @@ -55,11 +55,10 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal Where *blocking* means that this function is blocking until the results are returned. This is in opposition to *asynchronous*, where the function returns `None` almost immediately (aka non-blocking), - but the results must be retrieved by hand when jobs are finished. No mechanisim is provided here to be know - when jobs are finish. + but the results must be retrieved by hand when jobs are finished. No mechanism is provided here to know + when jobs are finished. In this *asynchronous* case, the :py:func:`~spikeinterface.sorters.read_sorter_folder()` helps to retrieve individual results. - Parameters ---------- job_list : list of dict @@ -68,10 +67,12 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal The engine to run the list. * "loop" : a simple loop. This engine is engine_kwargs : dict + In the case of engine="slum", arguments to sbatch can be passed via sbatch_args, which is a dictionary whose + keys correspond to the --args to be passed to sbatch. return_output : bool, dfault False Return a sortings or None. - This also overwrite kwargs in in run_sorter(with_sorting=True/False) + This also overwrite kwargs in run_sorter(with_sorting=True/False) Returns ------- @@ -81,6 +82,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal assert engine in _implemented_engine, f"engine must be in {_implemented_engine}" + engine_kwargs = {} if None else engine_kwargs engine_kwargs_ = dict() engine_kwargs_.update(_default_engine_kwargs[engine]) engine_kwargs_.update(engine_kwargs) @@ -150,8 +152,6 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal if tmp_script_folder is None: tmp_script_folder = tempfile.mkdtemp(prefix="spikeinterface_slurm_") tmp_script_folder = Path(tmp_script_folder) - cpus_per_task = engine_kwargs["cpus_per_task"] - mem = engine_kwargs["mem"] tmp_script_folder.mkdir(exist_ok=True, parents=True) @@ -180,8 +180,8 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal ) f.write(slurm_script) os.fchmod(f.fileno(), mode=stat.S_IRWXU) - - subprocess.Popen(["sbatch", str(script_name.absolute()), f"-cpus-per-task={cpus_per_task}", f"-mem={mem}"]) + sbatch_args = ' '.join(['--{k}={v}' for k, v in engine_kwargs['sbatch_args'].items()]) + subprocess.Popen("sbatch", str(script_name.absolute()), sbatch_args) return out From 4e59f23a5247f74670fe1b775e7abbbe355fc454 Mon Sep 17 00:00:00 2001 From: Marin Manuel Date: Sat, 29 Jun 2024 19:56:04 +0000 Subject: [PATCH 03/23] cleaned up code --- src/spikeinterface/sorters/launcher.py | 39 ++++++++++++++++++++------ 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 53cf0aed71..7882330d08 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -26,7 +26,7 @@ joblib=dict(n_jobs=-1, backend="loky"), processpoolexecutor=dict(max_workers=2, mp_context=None), dask=dict(client=None), - slurm=dict(tmp_script_folder=None, sbatch_args=dict(cpus_per_task=1, mem="1G")), + slurm={"tmp_script_folder": None, "sbatch_executable_path": "sbatch", "cpus-per-task": 1, "mem": "1G"}, ) @@ -67,10 +67,16 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F The engine to run the list. * "loop" : a simple loop. This engine is engine_kwargs : dict - In the case of engine="slum", arguments to sbatch can be passed via sbatch_args, which is a dictionary whose - keys correspond to the --args to be passed to sbatch. - - return_output : bool, dfault False + In the case of engine="slum", possible kwargs are: + - tmp_script_folder: str, default None + the folder in which the job scripts are created. Default: directory created by + the `tempfile` library + - sbatch_executable_path: str, default 'sbatch' + the path to the `sbatch` executable + - other kwargs are interpreted as arguments to sbatch, and are translated to the --args to be passed to sbatch. + see the [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) for a list of possible arguments + + return_output : bool, default False Return a sortings or None. This also overwrite kwargs in run_sorter(with_sorting=True/False) @@ -82,7 +88,8 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F assert engine in _implemented_engine, f"engine must be in {_implemented_engine}" - engine_kwargs = {} if None else engine_kwargs + if engine_kwargs is None: + engine_kwargs = dict() engine_kwargs_ = dict() engine_kwargs_.update(_default_engine_kwargs[engine]) engine_kwargs_.update(engine_kwargs) @@ -148,10 +155,18 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F elif engine == "slurm": # generate python script for slurm - tmp_script_folder = engine_kwargs["tmp_script_folder"] + tmp_script_folder = engine_kwargs.pop("tmp_script_folder") if tmp_script_folder is None: tmp_script_folder = tempfile.mkdtemp(prefix="spikeinterface_slurm_") tmp_script_folder = Path(tmp_script_folder) + sbatch_executable = engine_kwargs.pop("sbatch_executable_path") + + # for backward compatibility with previous version + if "cpus_per_task" in engine_kwargs: + warnings.warn("cpus_per_task is deprecated, use cpus-per-task instead", DeprecationWarning) + cpus_per_task = engine_kwargs.pop("cpus_per_task") + if "cpus-per-task" not in engine_kwargs: + engine_kwargs["cpus-per-task"] = cpus_per_task tmp_script_folder.mkdir(exist_ok=True, parents=True) @@ -180,8 +195,14 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F ) f.write(slurm_script) os.fchmod(f.fileno(), mode=stat.S_IRWXU) - sbatch_args = ' '.join(['--{k}={v}' for k, v in engine_kwargs['sbatch_args'].items()]) - subprocess.Popen("sbatch", str(script_name.absolute()), sbatch_args) + + progr = [sbatch_executable] + for k, v in engine_kwargs.items(): + progr.append(f"--{k}") + progr.append(f"{v}") + progr.append(str(script_name.absolute())) + p = subprocess.run(progr, capture_output=True, text=True) + print(p.stdout) return out From d204642218c2e8af51a045f446271d6ae635e217 Mon Sep 17 00:00:00 2001 From: Marin Manuel Date: Tue, 2 Jul 2024 14:36:52 -0500 Subject: [PATCH 04/23] updated to use sbatch_kwargs instead of putting all slurm arguments directly into engine_kwargs --- src/spikeinterface/sorters/launcher.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 7882330d08..4c8998eee5 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -26,7 +26,8 @@ joblib=dict(n_jobs=-1, backend="loky"), processpoolexecutor=dict(max_workers=2, mp_context=None), dask=dict(client=None), - slurm={"tmp_script_folder": None, "sbatch_executable_path": "sbatch", "cpus-per-task": 1, "mem": "1G"}, + slurm={"tmp_script_folder": None, "sbatch_executable_path": "sbatch", + 'sbatch_kwargs':{"cpus-per-task": 1, "mem": "1G"}}, ) @@ -73,7 +74,8 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F the `tempfile` library - sbatch_executable_path: str, default 'sbatch' the path to the `sbatch` executable - - other kwargs are interpreted as arguments to sbatch, and are translated to the --args to be passed to sbatch. + - sbatch_kwargs: dict + arguments to be passed to sbatch. They are translated to the --args form. see the [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) for a list of possible arguments return_output : bool, default False @@ -155,21 +157,14 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F elif engine == "slurm": # generate python script for slurm - tmp_script_folder = engine_kwargs.pop("tmp_script_folder") + tmp_script_folder = engine_kwargs["tmp_script_folder"] if tmp_script_folder is None: tmp_script_folder = tempfile.mkdtemp(prefix="spikeinterface_slurm_") tmp_script_folder = Path(tmp_script_folder) - sbatch_executable = engine_kwargs.pop("sbatch_executable_path") - - # for backward compatibility with previous version - if "cpus_per_task" in engine_kwargs: - warnings.warn("cpus_per_task is deprecated, use cpus-per-task instead", DeprecationWarning) - cpus_per_task = engine_kwargs.pop("cpus_per_task") - if "cpus-per-task" not in engine_kwargs: - engine_kwargs["cpus-per-task"] = cpus_per_task - tmp_script_folder.mkdir(exist_ok=True, parents=True) + sbatch_executable = engine_kwargs["sbatch_executable_path"] + for i, kwargs in enumerate(job_list): script_name = tmp_script_folder / f"si_script_{i}.py" with open(script_name, "w") as f: @@ -197,7 +192,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F os.fchmod(f.fileno(), mode=stat.S_IRWXU) progr = [sbatch_executable] - for k, v in engine_kwargs.items(): + for k, v in engine_kwargs['sbatch_kwargs'].items(): progr.append(f"--{k}") progr.append(f"{v}") progr.append(str(script_name.absolute())) From 8fedd95eaf29671dc07067fc93a30d1ac0cadeab Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 2 Jul 2024 19:37:34 +0000 Subject: [PATCH 05/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/spikeinterface/sorters/launcher.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 4c8998eee5..9a4d48e40f 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -26,8 +26,11 @@ joblib=dict(n_jobs=-1, backend="loky"), processpoolexecutor=dict(max_workers=2, mp_context=None), dask=dict(client=None), - slurm={"tmp_script_folder": None, "sbatch_executable_path": "sbatch", - 'sbatch_kwargs':{"cpus-per-task": 1, "mem": "1G"}}, + slurm={ + "tmp_script_folder": None, + "sbatch_executable_path": "sbatch", + "sbatch_kwargs": {"cpus-per-task": 1, "mem": "1G"}, + }, ) @@ -192,7 +195,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F os.fchmod(f.fileno(), mode=stat.S_IRWXU) progr = [sbatch_executable] - for k, v in engine_kwargs['sbatch_kwargs'].items(): + for k, v in engine_kwargs["sbatch_kwargs"].items(): progr.append(f"--{k}") progr.append(f"{v}") progr.append(str(script_name.absolute())) From 073358983b3ea058b4568d127317029d0be4719e Mon Sep 17 00:00:00 2001 From: Marin Manuel Date: Wed, 3 Jul 2024 10:24:32 -0500 Subject: [PATCH 06/23] removed sbatch_executable from the list of kwargs --- src/spikeinterface/sorters/launcher.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 4c8998eee5..9c7a1914a5 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -26,8 +26,8 @@ joblib=dict(n_jobs=-1, backend="loky"), processpoolexecutor=dict(max_workers=2, mp_context=None), dask=dict(client=None), - slurm={"tmp_script_folder": None, "sbatch_executable_path": "sbatch", - 'sbatch_kwargs':{"cpus-per-task": 1, "mem": "1G"}}, + slurm={"tmp_script_folder": None, + 'sbatch_args': {"cpus-per-task": 1, "mem": "1G"}}, ) @@ -72,8 +72,6 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F - tmp_script_folder: str, default None the folder in which the job scripts are created. Default: directory created by the `tempfile` library - - sbatch_executable_path: str, default 'sbatch' - the path to the `sbatch` executable - sbatch_kwargs: dict arguments to be passed to sbatch. They are translated to the --args form. see the [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) for a list of possible arguments @@ -163,8 +161,6 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F tmp_script_folder = Path(tmp_script_folder) tmp_script_folder.mkdir(exist_ok=True, parents=True) - sbatch_executable = engine_kwargs["sbatch_executable_path"] - for i, kwargs in enumerate(job_list): script_name = tmp_script_folder / f"si_script_{i}.py" with open(script_name, "w") as f: @@ -191,8 +187,8 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F f.write(slurm_script) os.fchmod(f.fileno(), mode=stat.S_IRWXU) - progr = [sbatch_executable] - for k, v in engine_kwargs['sbatch_kwargs'].items(): + progr = ['sbatch'] + for k, v in engine_kwargs['sbatch_args'].items(): progr.append(f"--{k}") progr.append(f"{v}") progr.append(str(script_name.absolute())) From 9c3ff1db49320a01cafebe0b029f0caca7a0f674 Mon Sep 17 00:00:00 2001 From: Marin Manuel Date: Wed, 3 Jul 2024 10:25:47 -0500 Subject: [PATCH 07/23] removed sbatch_executable from the list of kwargs --- src/spikeinterface/sorters/launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 9c7a1914a5..1e7a39ecef 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -72,7 +72,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F - tmp_script_folder: str, default None the folder in which the job scripts are created. Default: directory created by the `tempfile` library - - sbatch_kwargs: dict + - sbatch_args: dict arguments to be passed to sbatch. They are translated to the --args form. see the [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) for a list of possible arguments From c86f3b2e98ce8158de9bcc3f78a96efcace21956 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:27:00 +0000 Subject: [PATCH 08/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/spikeinterface/sorters/launcher.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 1e7a39ecef..9eed8b2fe1 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -26,8 +26,7 @@ joblib=dict(n_jobs=-1, backend="loky"), processpoolexecutor=dict(max_workers=2, mp_context=None), dask=dict(client=None), - slurm={"tmp_script_folder": None, - 'sbatch_args': {"cpus-per-task": 1, "mem": "1G"}}, + slurm={"tmp_script_folder": None, "sbatch_args": {"cpus-per-task": 1, "mem": "1G"}}, ) @@ -187,8 +186,8 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F f.write(slurm_script) os.fchmod(f.fileno(), mode=stat.S_IRWXU) - progr = ['sbatch'] - for k, v in engine_kwargs['sbatch_args'].items(): + progr = ["sbatch"] + for k, v in engine_kwargs["sbatch_args"].items(): progr.append(f"--{k}") progr.append(f"{v}") progr.append(str(script_name.absolute())) From f12461f762d87c60691e838cc5f450ba8b79840d Mon Sep 17 00:00:00 2001 From: MANUEL lab <65401298+MarinManuel@users.noreply.github.com> Date: Thu, 15 Aug 2024 21:17:02 -0400 Subject: [PATCH 09/23] clarified docstring and added error for cpus_per_taks --- src/spikeinterface/sorters/launcher.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 9eed8b2fe1..360b277cda 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -4,7 +4,6 @@ from __future__ import annotations - from pathlib import Path import shutil import numpy as np @@ -29,7 +28,6 @@ slurm={"tmp_script_folder": None, "sbatch_args": {"cpus-per-task": 1, "mem": "1G"}}, ) - _implemented_engine = list(_default_engine_kwargs.keys()) @@ -72,8 +70,9 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F the folder in which the job scripts are created. Default: directory created by the `tempfile` library - sbatch_args: dict - arguments to be passed to sbatch. They are translated to the --args form. - see the [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) for a list of possible arguments + arguments to be passed to sbatch. They will be automatically prefixed with --. + Arguments must be in the format slurm specify, see the + [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) for a list of possible arguments return_output : bool, default False Return a sortings or None. @@ -153,6 +152,9 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F task.result() elif engine == "slurm": + if 'cpus_per_task' in engine_kwargs: + raise ValueError('keyword argument cpus_per_task is no longer supported for slurm engine, ' + 'please use cpus-per-task instead.') # generate python script for slurm tmp_script_folder = engine_kwargs["tmp_script_folder"] if tmp_script_folder is None: @@ -193,6 +195,8 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F progr.append(str(script_name.absolute())) p = subprocess.run(progr, capture_output=True, text=True) print(p.stdout) + if len(p.stderr) > 0: + warnings.warn(p.stderr) return out From 002e9591df9dd6c8fc3e5f71fcddb2b3fa5e1ba8 Mon Sep 17 00:00:00 2001 From: MANUEL lab <65401298+MarinManuel@users.noreply.github.com> Date: Thu, 15 Aug 2024 21:17:12 -0400 Subject: [PATCH 10/23] added test --- .../sorters/tests/test_launcher.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index 362d45cbff..eff6d63804 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -125,6 +125,74 @@ def test_run_sorter_jobs_slurm(job_list, create_cache_folder): ), ) +@pytest.mark.skip("Slurm launcher need a machine with slurm") +def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): + """ + Mock `subprocess.run()` to check that engine_kwargs are + propagated to the call as expected. + """ + # First, mock `subprocess.run()`, set up a call to `run_sorter_jobs` + # then check the mocked `subprocess.run()` was called with the + # expected signature. Two jobs are passed in `jobs_list`, first + # check the most recent call. + mock_subprocess_run = mocker.patch( + "spikeinterface.sorters.launcher.subprocess.run" + ) + + tmp_script_folder = tmp_path / "slurm_scripts" + + engine_kwargs =dict( + tmp_script_folder=tmp_script_folder, + sbatch_args={ + "cpus-per-task": 32, + "mem": "32G", + "gres": "gpu:1", + "any_random_kwarg": 12322, + } + ) + run_sorter_jobs( + job_list, + engine="slurm", + engine_kwargs=engine_kwargs, + ) + + script_0_path = f"{tmp_script_folder}/si_script_0.py" + script_1_path = f"{tmp_script_folder}/si_script_1.py" + + expected_command = [ + "sbatch", "--cpus-per-task", "32", "--mem", "32G", "--gres", "gpu:1", "--any_random_kwarg", "12322", script_1_path + ] + mock_subprocess_run.assert_called_with(expected_command, capture_output=True, text=True) + + # Next, check the fisrt call (which sets up `si_script_0.py`) + # also has the expected arguments. + expected_command[9] = script_0_path + assert mock_subprocess_run.call_args_list[0].args[0] == expected_command + + # Next, check that defaults are used properly when no kwargs are + # passed. This will default to `_default_engine_kwargs` as + # set in `launcher.py` + run_sorter_jobs( + job_list, + engine="slurm", + engine_kwargs={"tmp_script_folder": tmp_script_folder}, + ) + expected_command = [ + "sbatch", "--cpus-per-task", "1", "--mem", "1G", script_1_path + ] + mock_subprocess_run.assert_called_with(expected_command, capture_output=True, text=True) + + # Finally, check that the `tmp_script_folder` is generated on the + # fly as expected. A random foldername is generated, just check that + # the folder to which the scripts are saved is in the `tempfile` format. + run_sorter_jobs( + job_list, + engine="slurm", + engine_kwargs=None, # TODO: test defaults + ) + tmp_script_folder = "_".join(tempfile.mkdtemp(prefix="spikeinterface_slurm_").split("_")[:-1]) + assert tmp_script_folder in mock_subprocess_run.call_args_list[-1].args[0][5] + def test_run_sorter_by_property(create_cache_folder): cache_folder = create_cache_folder @@ -157,6 +225,9 @@ def test_run_sorter_by_property(create_cache_folder): assert all([g in group_names1 for g in sorting1.get_property("group")]) + + + if __name__ == "__main__": # setup_module() tmp_folder = Path("tmp") From 88ca2f135cc805c01b2e15b251ed92d07aba795a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 Aug 2024 01:17:50 +0000 Subject: [PATCH 11/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/spikeinterface/sorters/launcher.py | 8 +++--- .../sorters/tests/test_launcher.py | 27 ++++++++++--------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 360b277cda..41638bbc4d 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -152,9 +152,11 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F task.result() elif engine == "slurm": - if 'cpus_per_task' in engine_kwargs: - raise ValueError('keyword argument cpus_per_task is no longer supported for slurm engine, ' - 'please use cpus-per-task instead.') + if "cpus_per_task" in engine_kwargs: + raise ValueError( + "keyword argument cpus_per_task is no longer supported for slurm engine, " + "please use cpus-per-task instead." + ) # generate python script for slurm tmp_script_folder = engine_kwargs["tmp_script_folder"] if tmp_script_folder is None: diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index eff6d63804..33348d8ef9 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -125,6 +125,7 @@ def test_run_sorter_jobs_slurm(job_list, create_cache_folder): ), ) + @pytest.mark.skip("Slurm launcher need a machine with slurm") def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): """ @@ -135,20 +136,18 @@ def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): # then check the mocked `subprocess.run()` was called with the # expected signature. Two jobs are passed in `jobs_list`, first # check the most recent call. - mock_subprocess_run = mocker.patch( - "spikeinterface.sorters.launcher.subprocess.run" - ) + mock_subprocess_run = mocker.patch("spikeinterface.sorters.launcher.subprocess.run") tmp_script_folder = tmp_path / "slurm_scripts" - engine_kwargs =dict( + engine_kwargs = dict( tmp_script_folder=tmp_script_folder, sbatch_args={ "cpus-per-task": 32, "mem": "32G", "gres": "gpu:1", "any_random_kwarg": 12322, - } + }, ) run_sorter_jobs( job_list, @@ -160,7 +159,16 @@ def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): script_1_path = f"{tmp_script_folder}/si_script_1.py" expected_command = [ - "sbatch", "--cpus-per-task", "32", "--mem", "32G", "--gres", "gpu:1", "--any_random_kwarg", "12322", script_1_path + "sbatch", + "--cpus-per-task", + "32", + "--mem", + "32G", + "--gres", + "gpu:1", + "--any_random_kwarg", + "12322", + script_1_path, ] mock_subprocess_run.assert_called_with(expected_command, capture_output=True, text=True) @@ -177,9 +185,7 @@ def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): engine="slurm", engine_kwargs={"tmp_script_folder": tmp_script_folder}, ) - expected_command = [ - "sbatch", "--cpus-per-task", "1", "--mem", "1G", script_1_path - ] + expected_command = ["sbatch", "--cpus-per-task", "1", "--mem", "1G", script_1_path] mock_subprocess_run.assert_called_with(expected_command, capture_output=True, text=True) # Finally, check that the `tmp_script_folder` is generated on the @@ -225,9 +231,6 @@ def test_run_sorter_by_property(create_cache_folder): assert all([g in group_names1 for g in sorting1.get_property("group")]) - - - if __name__ == "__main__": # setup_module() tmp_folder = Path("tmp") From e4f9f1fe40dca8c64952c9b1fcabae5b14ea6919 Mon Sep 17 00:00:00 2001 From: MANUEL lab <65401298+MarinManuel@users.noreply.github.com> Date: Thu, 15 Aug 2024 21:17:12 -0400 Subject: [PATCH 12/23] added test --- pyproject.toml | 3 + .../sorters/tests/test_launcher.py | 71 +++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 71919c072b..9a1f084a6a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -177,6 +177,9 @@ test = [ # for release we need pypi, so this need to be commented "probeinterface @ git+https://github.com/SpikeInterface/probeinterface.git", "neo @ git+https://github.com/NeuralEnsemble/python-neo.git", + + # for slurm launcher + "pytest-mock", ] docs = [ diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index 33348d8ef9..86ea3c2016 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -125,6 +125,74 @@ def test_run_sorter_jobs_slurm(job_list, create_cache_folder): ), ) +@pytest.mark.skip("Slurm launcher need a machine with slurm") +def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): + """ + Mock `subprocess.run()` to check that engine_kwargs are + propagated to the call as expected. + """ + # First, mock `subprocess.run()`, set up a call to `run_sorter_jobs` + # then check the mocked `subprocess.run()` was called with the + # expected signature. Two jobs are passed in `jobs_list`, first + # check the most recent call. + mock_subprocess_run = mocker.patch( + "spikeinterface.sorters.launcher.subprocess.run" + ) + + tmp_script_folder = tmp_path / "slurm_scripts" + + engine_kwargs =dict( + tmp_script_folder=tmp_script_folder, + sbatch_args={ + "cpus-per-task": 32, + "mem": "32G", + "gres": "gpu:1", + "any_random_kwarg": 12322, + } + ) + run_sorter_jobs( + job_list, + engine="slurm", + engine_kwargs=engine_kwargs, + ) + + script_0_path = f"{tmp_script_folder}/si_script_0.py" + script_1_path = f"{tmp_script_folder}/si_script_1.py" + + expected_command = [ + "sbatch", "--cpus-per-task", "32", "--mem", "32G", "--gres", "gpu:1", "--any_random_kwarg", "12322", script_1_path + ] + mock_subprocess_run.assert_called_with(expected_command, capture_output=True, text=True) + + # Next, check the fisrt call (which sets up `si_script_0.py`) + # also has the expected arguments. + expected_command[9] = script_0_path + assert mock_subprocess_run.call_args_list[0].args[0] == expected_command + + # Next, check that defaults are used properly when no kwargs are + # passed. This will default to `_default_engine_kwargs` as + # set in `launcher.py` + run_sorter_jobs( + job_list, + engine="slurm", + engine_kwargs={"tmp_script_folder": tmp_script_folder}, + ) + expected_command = [ + "sbatch", "--cpus-per-task", "1", "--mem", "1G", script_1_path + ] + mock_subprocess_run.assert_called_with(expected_command, capture_output=True, text=True) + + # Finally, check that the `tmp_script_folder` is generated on the + # fly as expected. A random foldername is generated, just check that + # the folder to which the scripts are saved is in the `tempfile` format. + run_sorter_jobs( + job_list, + engine="slurm", + engine_kwargs=None, # TODO: test defaults + ) + tmp_script_folder = "_".join(tempfile.mkdtemp(prefix="spikeinterface_slurm_").split("_")[:-1]) + assert tmp_script_folder in mock_subprocess_run.call_args_list[-1].args[0][5] + @pytest.mark.skip("Slurm launcher need a machine with slurm") def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): @@ -231,6 +299,9 @@ def test_run_sorter_by_property(create_cache_folder): assert all([g in group_names1 for g in sorting1.get_property("group")]) + + + if __name__ == "__main__": # setup_module() tmp_folder = Path("tmp") From e4b0b811f2f0dec24318be777988577b1708125d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 16 Aug 2024 01:26:03 +0000 Subject: [PATCH 13/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../sorters/tests/test_launcher.py | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index 86ea3c2016..3177c700d3 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -125,6 +125,7 @@ def test_run_sorter_jobs_slurm(job_list, create_cache_folder): ), ) + @pytest.mark.skip("Slurm launcher need a machine with slurm") def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): """ @@ -135,20 +136,18 @@ def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): # then check the mocked `subprocess.run()` was called with the # expected signature. Two jobs are passed in `jobs_list`, first # check the most recent call. - mock_subprocess_run = mocker.patch( - "spikeinterface.sorters.launcher.subprocess.run" - ) + mock_subprocess_run = mocker.patch("spikeinterface.sorters.launcher.subprocess.run") tmp_script_folder = tmp_path / "slurm_scripts" - engine_kwargs =dict( + engine_kwargs = dict( tmp_script_folder=tmp_script_folder, sbatch_args={ "cpus-per-task": 32, "mem": "32G", "gres": "gpu:1", "any_random_kwarg": 12322, - } + }, ) run_sorter_jobs( job_list, @@ -160,7 +159,16 @@ def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): script_1_path = f"{tmp_script_folder}/si_script_1.py" expected_command = [ - "sbatch", "--cpus-per-task", "32", "--mem", "32G", "--gres", "gpu:1", "--any_random_kwarg", "12322", script_1_path + "sbatch", + "--cpus-per-task", + "32", + "--mem", + "32G", + "--gres", + "gpu:1", + "--any_random_kwarg", + "12322", + script_1_path, ] mock_subprocess_run.assert_called_with(expected_command, capture_output=True, text=True) @@ -177,9 +185,7 @@ def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): engine="slurm", engine_kwargs={"tmp_script_folder": tmp_script_folder}, ) - expected_command = [ - "sbatch", "--cpus-per-task", "1", "--mem", "1G", script_1_path - ] + expected_command = ["sbatch", "--cpus-per-task", "1", "--mem", "1G", script_1_path] mock_subprocess_run.assert_called_with(expected_command, capture_output=True, text=True) # Finally, check that the `tmp_script_folder` is generated on the @@ -299,9 +305,6 @@ def test_run_sorter_by_property(create_cache_folder): assert all([g in group_names1 for g in sorting1.get_property("group")]) - - - if __name__ == "__main__": # setup_module() tmp_folder = Path("tmp") From 5b6d560a0aaa1e5ea64e771f514fb54f277158b4 Mon Sep 17 00:00:00 2001 From: MANUEL lab <65401298+MarinManuel@users.noreply.github.com> Date: Wed, 11 Sep 2024 13:36:18 -0400 Subject: [PATCH 14/23] docstring fix Co-authored-by: Zach McKenzie <92116279+zm711@users.noreply.github.com> --- src/spikeinterface/sorters/launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index bbd13de2a5..0cff4cdf0e 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -66,7 +66,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F * "loop" : a simple loop. This engine is engine_kwargs : dict In the case of engine="slum", possible kwargs are: - - tmp_script_folder: str, default None + - tmp_script_folder : str, default: None the folder in which the job scripts are created. Default: directory created by the `tempfile` library - sbatch_args: dict From 35a2a7e2e06d11900f182ccca6facaf7f858ed7b Mon Sep 17 00:00:00 2001 From: MANUEL lab <65401298+MarinManuel@users.noreply.github.com> Date: Wed, 11 Sep 2024 13:36:33 -0400 Subject: [PATCH 15/23] docstring fix Co-authored-by: Zach McKenzie <92116279+zm711@users.noreply.github.com> --- src/spikeinterface/sorters/launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 0cff4cdf0e..8fdfbe18cc 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -74,7 +74,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F Arguments must be in the format slurm specify, see the [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) for a list of possible arguments - return_output : bool, default False + return_output : bool, default: False Return a sortings or None. This also overwrite kwargs in run_sorter(with_sorting=True/False) From 910fa616fba371dac87adef9f1725e7a2dbed846 Mon Sep 17 00:00:00 2001 From: MANUEL lab <65401298+MarinManuel@users.noreply.github.com> Date: Wed, 11 Sep 2024 13:37:04 -0400 Subject: [PATCH 16/23] docstring fix Co-authored-by: Joe Ziminski <55797454+JoeZiminski@users.noreply.github.com> --- src/spikeinterface/sorters/launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 8fdfbe18cc..e8ccd40b14 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -76,7 +76,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F return_output : bool, default: False Return a sortings or None. - This also overwrite kwargs in run_sorter(with_sorting=True/False) + This also overwrites kwargs in run_sorter(with_sorting=True/False) Returns ------- From 6ba8423d7c088975479c945bd37369ab2a1943d2 Mon Sep 17 00:00:00 2001 From: MANUEL lab <65401298+MarinManuel@users.noreply.github.com> Date: Wed, 11 Sep 2024 14:13:55 -0400 Subject: [PATCH 17/23] added slurm_kwargs argument --- src/spikeinterface/sorters/launcher.py | 58 +++++++++++++++----------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index e8ccd40b14..723d1cb393 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -25,13 +25,15 @@ joblib=dict(n_jobs=-1, backend="loky"), processpoolexecutor=dict(max_workers=2, mp_context=None), dask=dict(client=None), - slurm={"tmp_script_folder": None, "sbatch_args": {"cpus-per-task": 1, "mem": "1G"}}, + slurm={"tmp_script_folder": None}, ) +_default_slurm_kwargs = {'cpus-per-task': 1, 'mem': '1G'} + _implemented_engine = list(_default_engine_kwargs.keys()) -def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=False): +def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, slurm_kwargs=None, return_output=False): """ Run several :py:func:`run_sorter()` sequentially or in parallel given a list of jobs. @@ -63,16 +65,20 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F A list a dict that are propagated to run_sorter(...) engine : str "loop", "joblib", "dask", "slurm" The engine to run the list. - * "loop" : a simple loop. This engine is engine_kwargs : dict - In the case of engine="slum", possible kwargs are: - - tmp_script_folder : str, default: None - the folder in which the job scripts are created. Default: directory created by - the `tempfile` library - - sbatch_args: dict - arguments to be passed to sbatch. They will be automatically prefixed with --. - Arguments must be in the format slurm specify, see the - [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) for a list of possible arguments + Parameters to be passed to the underlying engine. + Defaults are: + * loop : None + * joblib : n_jobs=1, backend="loky" + * multiprocessing : max_workers=2, mp_context=None + * dask : client=None + * slurm : tmp_script_folder=None + slurm_kwargs: dict + Exclusively for engine="slum", ignored for all other engines. + This dictionary contains arguments to be passed to sbatch. + They will be automatically prefixed with --. + Arguments must be in the format slurm specify, see the + [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) for a list of possible arguments return_output : bool, default: False Return a sortings or None. @@ -155,7 +161,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F if "cpus_per_task" in engine_kwargs: raise ValueError( "keyword argument cpus_per_task is no longer supported for slurm engine, " - "please use cpus-per-task instead." + "please use cpus-per-task in `slurm_kwarg` instead." ) # generate python script for slurm tmp_script_folder = engine_kwargs["tmp_script_folder"] @@ -191,7 +197,9 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F os.fchmod(f.fileno(), mode=stat.S_IRWXU) progr = ["sbatch"] - for k, v in engine_kwargs["sbatch_args"].items(): + if slurm_kwargs is None: + slurm_kwargs = _default_slurm_kwargs + for k, v in slurm_kwargs.items(): progr.append(f"--{k}") progr.append(f"{v}") progr.append(str(script_name.absolute())) @@ -220,18 +228,18 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=F def run_sorter_by_property( - sorter_name, - recording, - grouping_property, - folder, - mode_if_folder_exists=None, - engine="loop", - engine_kwargs={}, - verbose=False, - docker_image=None, - singularity_image=None, - working_folder: None = None, - **sorter_params, + sorter_name, + recording, + grouping_property, + folder, + mode_if_folder_exists=None, + engine="loop", + engine_kwargs={}, + verbose=False, + docker_image=None, + singularity_image=None, + working_folder: None = None, + **sorter_params, ): """ Generic function to run a sorter on a recording after splitting by a "grouping_property" (e.g. "group"). From 0ec9af504a99a56bc1d57dc0bd6ff5a799fb9361 Mon Sep 17 00:00:00 2001 From: MANUEL lab <65401298+MarinManuel@users.noreply.github.com> Date: Wed, 11 Sep 2024 14:14:02 -0400 Subject: [PATCH 18/23] fixed test --- .../sorters/tests/test_launcher.py | 86 ++----------------- 1 file changed, 6 insertions(+), 80 deletions(-) diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index 3177c700d3..66f8b559f4 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -1,7 +1,7 @@ import sys import shutil +import tempfile import time - import pytest from pathlib import Path @@ -126,7 +126,6 @@ def test_run_sorter_jobs_slurm(job_list, create_cache_folder): ) -@pytest.mark.skip("Slurm launcher need a machine with slurm") def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): """ Mock `subprocess.run()` to check that engine_kwargs are @@ -141,92 +140,19 @@ def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): tmp_script_folder = tmp_path / "slurm_scripts" engine_kwargs = dict( - tmp_script_folder=tmp_script_folder, - sbatch_args={ + tmp_script_folder=tmp_script_folder) + slurm_kwargs={ "cpus-per-task": 32, "mem": "32G", "gres": "gpu:1", "any_random_kwarg": 12322, - }, - ) - run_sorter_jobs( - job_list, - engine="slurm", - engine_kwargs=engine_kwargs, - ) - - script_0_path = f"{tmp_script_folder}/si_script_0.py" - script_1_path = f"{tmp_script_folder}/si_script_1.py" - - expected_command = [ - "sbatch", - "--cpus-per-task", - "32", - "--mem", - "32G", - "--gres", - "gpu:1", - "--any_random_kwarg", - "12322", - script_1_path, - ] - mock_subprocess_run.assert_called_with(expected_command, capture_output=True, text=True) - - # Next, check the fisrt call (which sets up `si_script_0.py`) - # also has the expected arguments. - expected_command[9] = script_0_path - assert mock_subprocess_run.call_args_list[0].args[0] == expected_command - - # Next, check that defaults are used properly when no kwargs are - # passed. This will default to `_default_engine_kwargs` as - # set in `launcher.py` - run_sorter_jobs( - job_list, - engine="slurm", - engine_kwargs={"tmp_script_folder": tmp_script_folder}, - ) - expected_command = ["sbatch", "--cpus-per-task", "1", "--mem", "1G", script_1_path] - mock_subprocess_run.assert_called_with(expected_command, capture_output=True, text=True) - - # Finally, check that the `tmp_script_folder` is generated on the - # fly as expected. A random foldername is generated, just check that - # the folder to which the scripts are saved is in the `tempfile` format. - run_sorter_jobs( - job_list, - engine="slurm", - engine_kwargs=None, # TODO: test defaults - ) - tmp_script_folder = "_".join(tempfile.mkdtemp(prefix="spikeinterface_slurm_").split("_")[:-1]) - assert tmp_script_folder in mock_subprocess_run.call_args_list[-1].args[0][5] - - -@pytest.mark.skip("Slurm launcher need a machine with slurm") -def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): - """ - Mock `subprocess.run()` to check that engine_kwargs are - propagated to the call as expected. - """ - # First, mock `subprocess.run()`, set up a call to `run_sorter_jobs` - # then check the mocked `subprocess.run()` was called with the - # expected signature. Two jobs are passed in `jobs_list`, first - # check the most recent call. - mock_subprocess_run = mocker.patch("spikeinterface.sorters.launcher.subprocess.run") - - tmp_script_folder = tmp_path / "slurm_scripts" + } - engine_kwargs = dict( - tmp_script_folder=tmp_script_folder, - sbatch_args={ - "cpus-per-task": 32, - "mem": "32G", - "gres": "gpu:1", - "any_random_kwarg": 12322, - }, - ) run_sorter_jobs( job_list, engine="slurm", engine_kwargs=engine_kwargs, + slurm_kwargs=slurm_kwargs ) script_0_path = f"{tmp_script_folder}/si_script_0.py" @@ -268,7 +194,7 @@ def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): run_sorter_jobs( job_list, engine="slurm", - engine_kwargs=None, # TODO: test defaults + engine_kwargs=None, ) tmp_script_folder = "_".join(tempfile.mkdtemp(prefix="spikeinterface_slurm_").split("_")[:-1]) assert tmp_script_folder in mock_subprocess_run.call_args_list[-1].args[0][5] From e7071704b01a6db7580255993a9c58cfb18d39fe Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 11 Sep 2024 18:14:35 +0000 Subject: [PATCH 19/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/spikeinterface/sorters/launcher.py | 26 +++++++++---------- .../sorters/tests/test_launcher.py | 24 +++++++---------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 723d1cb393..5b9302ef9e 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -28,7 +28,7 @@ slurm={"tmp_script_folder": None}, ) -_default_slurm_kwargs = {'cpus-per-task': 1, 'mem': '1G'} +_default_slurm_kwargs = {"cpus-per-task": 1, "mem": "1G"} _implemented_engine = list(_default_engine_kwargs.keys()) @@ -228,18 +228,18 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, slurm_kwargs=No def run_sorter_by_property( - sorter_name, - recording, - grouping_property, - folder, - mode_if_folder_exists=None, - engine="loop", - engine_kwargs={}, - verbose=False, - docker_image=None, - singularity_image=None, - working_folder: None = None, - **sorter_params, + sorter_name, + recording, + grouping_property, + folder, + mode_if_folder_exists=None, + engine="loop", + engine_kwargs={}, + verbose=False, + docker_image=None, + singularity_image=None, + working_folder: None = None, + **sorter_params, ): """ Generic function to run a sorter on a recording after splitting by a "grouping_property" (e.g. "group"). diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index 66f8b559f4..b03843563b 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -139,21 +139,15 @@ def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): tmp_script_folder = tmp_path / "slurm_scripts" - engine_kwargs = dict( - tmp_script_folder=tmp_script_folder) - slurm_kwargs={ - "cpus-per-task": 32, - "mem": "32G", - "gres": "gpu:1", - "any_random_kwarg": 12322, - } - - run_sorter_jobs( - job_list, - engine="slurm", - engine_kwargs=engine_kwargs, - slurm_kwargs=slurm_kwargs - ) + engine_kwargs = dict(tmp_script_folder=tmp_script_folder) + slurm_kwargs = { + "cpus-per-task": 32, + "mem": "32G", + "gres": "gpu:1", + "any_random_kwarg": 12322, + } + + run_sorter_jobs(job_list, engine="slurm", engine_kwargs=engine_kwargs, slurm_kwargs=slurm_kwargs) script_0_path = f"{tmp_script_folder}/si_script_0.py" script_1_path = f"{tmp_script_folder}/si_script_1.py" From e7a89c974ae0b34ccb8d4d538ca3447d36418c08 Mon Sep 17 00:00:00 2001 From: MANUEL lab <65401298+MarinManuel@users.noreply.github.com> Date: Thu, 12 Sep 2024 13:47:59 -0400 Subject: [PATCH 20/23] fixed test failing on Windows by limiting to slurm test to Linux --- pyproject.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 06d94bd522..011808fa70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -127,6 +127,9 @@ test_core = [ # for release we need pypi, so this need to be commented "probeinterface @ git+https://github.com/SpikeInterface/probeinterface.git", "neo @ git+https://github.com/NeuralEnsemble/python-neo.git", + + # for slurm jobs, + "pytest-mock" ] test_extractors = [ @@ -178,7 +181,7 @@ test = [ "probeinterface @ git+https://github.com/SpikeInterface/probeinterface.git", "neo @ git+https://github.com/NeuralEnsemble/python-neo.git", - # for slurm launcher + # for slurm jobs "pytest-mock", ] From 48aee1be8c45a3a77fff88eade83d5f8a93f8352 Mon Sep 17 00:00:00 2001 From: MANUEL lab <65401298+MarinManuel@users.noreply.github.com> Date: Thu, 12 Sep 2024 13:58:36 -0400 Subject: [PATCH 21/23] fixed test failing on Windows by limiting to slurm test to Linux --- .../sorters/tests/test_launcher.py | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index b03843563b..84ed851405 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -4,7 +4,7 @@ import time import pytest from pathlib import Path - +from platform import system from spikeinterface import generate_ground_truth_recording from spikeinterface.sorters import run_sorter_jobs, run_sorter_by_property @@ -126,6 +126,7 @@ def test_run_sorter_jobs_slurm(job_list, create_cache_folder): ) +@pytest.mark.skipif(system() != 'Linux', reason="Assumes we are on Linux to run SLURM") def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): """ Mock `subprocess.run()` to check that engine_kwargs are @@ -139,15 +140,20 @@ def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): tmp_script_folder = tmp_path / "slurm_scripts" - engine_kwargs = dict(tmp_script_folder=tmp_script_folder) - slurm_kwargs = { - "cpus-per-task": 32, - "mem": "32G", - "gres": "gpu:1", - "any_random_kwarg": 12322, - } + engine_kwargs = dict( + tmp_script_folder=tmp_script_folder, + sbatch_args={ + "cpus-per-task": 32, + "mem": "32G", + "gres": "gpu:1", + "any_random_kwarg": 12322, + }) - run_sorter_jobs(job_list, engine="slurm", engine_kwargs=engine_kwargs, slurm_kwargs=slurm_kwargs) + run_sorter_jobs( + job_list, + engine="slurm", + engine_kwargs=engine_kwargs + ) script_0_path = f"{tmp_script_folder}/si_script_0.py" script_1_path = f"{tmp_script_folder}/si_script_1.py" From 6bc8be2a3f6bcf6b295ff40b1867e7bc58064884 Mon Sep 17 00:00:00 2001 From: MANUEL lab <65401298+MarinManuel@users.noreply.github.com> Date: Thu, 12 Sep 2024 13:59:00 -0400 Subject: [PATCH 22/23] reverted slurm_kwargs and improved docstring --- src/spikeinterface/sorters/launcher.py | 65 +++++++++++++------------- 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 5b9302ef9e..8da056e43e 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -4,36 +4,29 @@ from __future__ import annotations -from pathlib import Path -import shutil -import numpy as np -import tempfile import os import stat import subprocess import sys +import tempfile import warnings - +import numpy as np +from pathlib import Path from spikeinterface.core import aggregate_units - -from .sorterlist import sorter_dict from .runsorter import run_sorter -from .basesorter import is_log_ok _default_engine_kwargs = dict( loop=dict(), joblib=dict(n_jobs=-1, backend="loky"), processpoolexecutor=dict(max_workers=2, mp_context=None), dask=dict(client=None), - slurm={"tmp_script_folder": None}, + slurm={"tmp_script_folder": None, "sbatch_args": {"cpus-per-task": 1, "mem": "1G"}}, ) -_default_slurm_kwargs = {"cpus-per-task": 1, "mem": "1G"} - _implemented_engine = list(_default_engine_kwargs.keys()) -def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, slurm_kwargs=None, return_output=False): +def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=False): """ Run several :py:func:`run_sorter()` sequentially or in parallel given a list of jobs. @@ -67,18 +60,27 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, slurm_kwargs=No The engine to run the list. engine_kwargs : dict Parameters to be passed to the underlying engine. - Defaults are: * loop : None - * joblib : n_jobs=1, backend="loky" - * multiprocessing : max_workers=2, mp_context=None - * dask : client=None - * slurm : tmp_script_folder=None - slurm_kwargs: dict - Exclusively for engine="slum", ignored for all other engines. - This dictionary contains arguments to be passed to sbatch. - They will be automatically prefixed with --. - Arguments must be in the format slurm specify, see the - [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) for a list of possible arguments + * joblib : + - n_jobs : int + The maximum number of concurrently running jobs (default=-1, tries to use all CPUs) + - backend : str + Specify the parallelization backend implementation (default="loky") + * multiprocessing : + - max_workers : int + maximum number of processes (default=2) + - mp_context : str + multiprocessing context (default=None) + * dask : + - client : dask.distributed.Client + Dask client to connect to (required) + * slurm : + - tmp_script_folder : str,Path + the folder in which the job scripts are created (default=None, create a random temporary directory) + - sbatch_args: dict + dictionary of arguments to be passed to the sbatch command. They will be automatically prefixed with --. + Arguments must be in the format slurm specify, see the [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) + for a list of possible arguments (default={"cpus-per-task": 1, "mem": "1G"}) return_output : bool, default: False Return a sortings or None. @@ -161,7 +163,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, slurm_kwargs=No if "cpus_per_task" in engine_kwargs: raise ValueError( "keyword argument cpus_per_task is no longer supported for slurm engine, " - "please use cpus-per-task in `slurm_kwarg` instead." + "please use cpus-per-task instead." ) # generate python script for slurm tmp_script_folder = engine_kwargs["tmp_script_folder"] @@ -197,12 +199,11 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, slurm_kwargs=No os.fchmod(f.fileno(), mode=stat.S_IRWXU) progr = ["sbatch"] - if slurm_kwargs is None: - slurm_kwargs = _default_slurm_kwargs - for k, v in slurm_kwargs.items(): + for k, v in engine_kwargs["sbatch_args"].items(): progr.append(f"--{k}") progr.append(f"{v}") progr.append(str(script_name.absolute())) + print(f"subprocess called with command {' '.join(progr)}") p = subprocess.run(progr, capture_output=True, text=True) print(p.stdout) if len(p.stderr) > 0: @@ -234,7 +235,7 @@ def run_sorter_by_property( folder, mode_if_folder_exists=None, engine="loop", - engine_kwargs={}, + engine_kwargs=None, verbose=False, docker_image=None, singularity_image=None, @@ -264,13 +265,11 @@ def run_sorter_by_property( Must be None. This is deprecated. If not None then a warning is raise. Will be removed in next release. - engine : "loop" | "joblib" | "dask", default: "loop" + engine : "loop" | "joblib" | "dask" | "slurm", default: "loop" Which engine to use to run sorter. engine_kwargs : dict - This contains kwargs specific to the launcher engine: - * "loop" : no kwargs - * "joblib" : {"n_jobs" : } number of processes - * "dask" : {"client":} the dask client for submitting task + This contains kwargs specific to the launcher engine. + See the documentation for :py:func:`~spikeinterface.sorters.launcher.run_sorter_jobs()` for more details. verbose : bool, default: False Controls sorter verboseness docker_image : None or str, default: None From d2ac5046f2b4bf1908d1652c7404f64b09d301d5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 12 Sep 2024 18:00:01 +0000 Subject: [PATCH 23/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/spikeinterface/sorters/tests/test_launcher.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index 84ed851405..203f20b7e6 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -126,7 +126,7 @@ def test_run_sorter_jobs_slurm(job_list, create_cache_folder): ) -@pytest.mark.skipif(system() != 'Linux', reason="Assumes we are on Linux to run SLURM") +@pytest.mark.skipif(system() != "Linux", reason="Assumes we are on Linux to run SLURM") def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): """ Mock `subprocess.run()` to check that engine_kwargs are @@ -147,14 +147,11 @@ def test_run_sorter_jobs_slurm_kwargs(mocker, tmp_path, job_list): "mem": "32G", "gres": "gpu:1", "any_random_kwarg": 12322, - }) - - run_sorter_jobs( - job_list, - engine="slurm", - engine_kwargs=engine_kwargs + }, ) + run_sorter_jobs(job_list, engine="slurm", engine_kwargs=engine_kwargs) + script_0_path = f"{tmp_script_folder}/si_script_0.py" script_1_path = f"{tmp_script_folder}/si_script_1.py"