From e64b8b4e99aabae273738e5f2985a651f321aa08 Mon Sep 17 00:00:00 2001 From: Samuel Garcia Date: Wed, 6 Sep 2023 15:00:52 +0200 Subject: [PATCH 01/12] Recafor sorter launcher. Deorecated run_sorters() and add run_sorter_jobs() --- doc/api.rst | 1 + doc/modules/sorters.rst | 37 +- src/spikeinterface/comparison/studytools.py | 38 +- src/spikeinterface/sorters/__init__.py | 9 +- src/spikeinterface/sorters/basesorter.py | 11 + src/spikeinterface/sorters/launcher.py | 450 ++++++++---------- .../sorters/tests/test_launcher.py | 287 ++++++----- 7 files changed, 406 insertions(+), 427 deletions(-) diff --git a/doc/api.rst b/doc/api.rst index 2e9fc1567a..1e8d6d62b1 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -212,6 +212,7 @@ spikeinterface.sorters .. autofunction:: print_sorter_versions .. autofunction:: get_sorter_description .. autofunction:: run_sorter + .. autofunction:: run_sorter_jobs .. autofunction:: run_sorters .. autofunction:: run_sorter_by_property diff --git a/doc/modules/sorters.rst b/doc/modules/sorters.rst index 26f2365202..ad50f9e411 100644 --- a/doc/modules/sorters.rst +++ b/doc/modules/sorters.rst @@ -285,27 +285,26 @@ Running several sorters in parallel The :py:mod:`~spikeinterface.sorters` module also includes tools to run several spike sorting jobs sequentially or in parallel. This can be done with the -:py:func:`~spikeinterface.sorters.run_sorters()` function by specifying +:py:func:`~spikeinterface.sorters.run_sorter_jobs()` function by specifying an :code:`engine` that supports parallel processing (such as :code:`joblib` or :code:`slurm`). .. code-block:: python - recordings = {'rec1' : recording, 'rec2': another_recording} - sorter_list = ['herdingspikes', 'tridesclous'] - sorter_params = { - 'herdingspikes': {'clustering_bandwidth' : 8}, - 'tridesclous': {'detect_threshold' : 5.}, - } - sorting_output = run_sorters(sorter_list, recordings, working_folder='tmp_some_sorters', - mode_if_folder_exists='overwrite', sorter_params=sorter_params) + # here we run 2 sorters on 2 diffrents recording = 4 jobs + recording = ... + another_recording = ... + + job_list = [ + {'sorter_name': 'tridesclous', 'recording': recording, 'output_folder': '/folder1','detect_threshold': 5.}, + {'sorter_name': 'tridesclous', 'recording': another_recording, 'output_folder': '/folder2', 'detect_threshold': 5.}, + {'sorter_name': 'herdingspikes', 'recording': recording, 'output_folder': '/folder3', 'clustering_bandwidth': 8., 'docker_image': True}, + {'sorter_name': 'herdingspikes', 'recording': another_recording, 'output_folder': '/folder4', 'clustering_bandwidth': 8., 'docker_image': True}, + ] + + # run in loop + sortings = run_sorter_jobs(job_list, engine='loop') - # the output is a dict with (rec_name, sorter_name) as keys - for (rec_name, sorter_name), sorting in sorting_output.items(): - print(rec_name, sorter_name, ':', sorting.get_unit_ids()) -After the jobs are run, the :code:`sorting_outputs` is a dictionary with :code:`(rec_name, sorter_name)` as a key (e.g. -:code:`('rec1', 'tridesclous')` in this example), and the corresponding :py:class:`~spikeinterface.core.BaseSorting` -as a value. :py:func:`~spikeinterface.sorters.run_sorters` has several "engines" available to launch the computation: @@ -315,13 +314,11 @@ as a value. .. code-block:: python - run_sorters(sorter_list, recordings, engine='loop') + run_sorter_jobs(job_list, engine='loop') - run_sorters(sorter_list, recordings, engine='joblib', - engine_kwargs={'n_jobs': 2}) + run_sorter_jobs(job_list, engine='joblib', engine_kwargs={'n_jobs': 2}) - run_sorters(sorter_list, recordings, engine='slurm', - engine_kwargs={'cpus_per_task': 10, 'mem', '5G'}) + run_sorter_jobs(job_list, engine='slurm', engine_kwargs={'cpus_per_task': 10, 'mem', '5G'}) Spike sorting by group diff --git a/src/spikeinterface/comparison/studytools.py b/src/spikeinterface/comparison/studytools.py index 79227c865f..00119c1586 100644 --- a/src/spikeinterface/comparison/studytools.py +++ b/src/spikeinterface/comparison/studytools.py @@ -22,12 +22,48 @@ from spikeinterface.core.job_tools import fix_job_kwargs from spikeinterface.extractors import NpzSortingExtractor from spikeinterface.sorters import sorter_dict -from spikeinterface.sorters.launcher import iter_working_folder, iter_sorting_output +from spikeinterface.sorters.basesorter import is_log_ok + from .comparisontools import _perf_keys from .paircomparisons import compare_sorter_to_ground_truth + + + +# This is deprecated and will be removed +def iter_working_folder(working_folder): + working_folder = Path(working_folder) + for rec_folder in working_folder.iterdir(): + if not rec_folder.is_dir(): + continue + for output_folder in rec_folder.iterdir(): + if (output_folder / "spikeinterface_job.json").is_file(): + with open(output_folder / "spikeinterface_job.json", "r") as f: + job_dict = json.load(f) + rec_name = job_dict["rec_name"] + sorter_name = job_dict["sorter_name"] + yield rec_name, sorter_name, output_folder + else: + rec_name = rec_folder.name + sorter_name = output_folder.name + if not output_folder.is_dir(): + continue + if not is_log_ok(output_folder): + continue + yield rec_name, sorter_name, output_folder + +# This is deprecated and will be removed +def iter_sorting_output(working_folder): + """Iterator over output_folder to retrieve all triplets of (rec_name, sorter_name, sorting).""" + for rec_name, sorter_name, output_folder in iter_working_folder(working_folder): + SorterClass = sorter_dict[sorter_name] + sorting = SorterClass.get_result_from_folder(output_folder) + yield rec_name, sorter_name, sorting + + + def setup_comparison_study(study_folder, gt_dict, **job_kwargs): """ Based on a dict of (recording, sorting) create the study folder. diff --git a/src/spikeinterface/sorters/__init__.py b/src/spikeinterface/sorters/__init__.py index a0d437559d..ba663327e8 100644 --- a/src/spikeinterface/sorters/__init__.py +++ b/src/spikeinterface/sorters/__init__.py @@ -1,11 +1,4 @@ from .basesorter import BaseSorter from .sorterlist import * from .runsorter import * - -from .launcher import ( - run_sorters, - run_sorter_by_property, - collect_sorting_outputs, - iter_working_folder, - iter_sorting_output, -) +from .launcher import run_sorter_jobs, run_sorters, run_sorter_by_property diff --git a/src/spikeinterface/sorters/basesorter.py b/src/spikeinterface/sorters/basesorter.py index ff559cc78d..aa76809b58 100644 --- a/src/spikeinterface/sorters/basesorter.py +++ b/src/spikeinterface/sorters/basesorter.py @@ -411,3 +411,14 @@ def get_job_kwargs(params, verbose): if not verbose: job_kwargs["progress_bar"] = False return job_kwargs + + +def is_log_ok(output_folder): + # log is OK when run_time is not None + if (output_folder / "spikeinterface_log.json").is_file(): + with open(output_folder / "spikeinterface_log.json", mode="r", encoding="utf8") as logfile: + log = json.load(logfile) + run_time = log.get("run_time", None) + ok = run_time is not None + return ok + return False \ No newline at end of file diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 52098f45cd..138b4c5848 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -10,55 +10,148 @@ import stat import subprocess import sys +import warnings from spikeinterface.core import load_extractor, aggregate_units from spikeinterface.core.core_tools import check_json from .sorterlist import sorter_dict -from .runsorter import run_sorter, run_sorter - - -def _run_one(arg_list): - # the multiprocessing python module force to have one unique tuple argument - ( - sorter_name, - recording, - output_folder, - verbose, - sorter_params, - docker_image, - singularity_image, - with_output, - ) = arg_list - - if isinstance(recording, dict): - recording = load_extractor(recording) +from .runsorter import run_sorter +from .basesorter import is_log_ok + +_implemented_engine = ("loop", "joblib", "dask", "slurm") + +def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=False): + """ + Run several :py:func:`run_sorter()` sequencially or in parralel given a list of job. + + For **engine="loop"** this is equivalent to: + + ..code:: + + for job in job_list: + run_sorter(**job) + + For some engines, this function is blocking until the results ("loop", "joblib", "multiprocessing", "dask"). + For some other engine ("slurm") the function return almost immediatly (akak non blocking) and the results + must be retrieve by hand when finished with :py:func:`read_sorter_folder()`. + + Parameters + ---------- + job_list: list of dict + A list a dict that are propagated to run_sorter(...) + engine: str "loop", "joblib", "dask", "slurm" + The engine to run the list. + * "loop": a simple loop. This engine is + engine_kwargs: dict + + return_output: bool, dfault False + Return a sorting or None. + + Returns + ------- + sortings: None or list of sorting + With engine="loop" or "joblib" you can optional get directly the list of sorting result if return_output=True. + """ + + assert engine in _implemented_engine, f"engine must be in {_implemented_engine}" + + if return_output: + assert engine in ("loop", "joblib", "multiprocessing") + out = [] else: - recording = recording - - # because this is checks in run_sorters before this call - remove_existing_folder = False - # result is retrieve later - delete_output_folder = False - # because we won't want the loop/worker to break - raise_error = False - - run_sorter( - sorter_name, - recording, - output_folder=output_folder, - remove_existing_folder=remove_existing_folder, - delete_output_folder=delete_output_folder, - verbose=verbose, - raise_error=raise_error, - docker_image=docker_image, - singularity_image=singularity_image, - with_output=with_output, - **sorter_params, - ) + out = None + + if engine == "loop": + # simple loop in main process + for kwargs in job_list: + sorting = run_sorter(**kwargs) + if return_output: + out.append(sorting) + + elif engine == "joblib": + from joblib import Parallel, delayed + + n_jobs = engine_kwargs.get("n_jobs", -1) + backend = engine_kwargs.get("backend", "loky") + sortings = Parallel(n_jobs=n_jobs, backend=backend)(delayed(run_sorter)(**kwargs) for kwargs in job_list) + if return_output: + out.extend(sortings) + + elif engine == "multiprocessing": + raise NotImplementedError() + + elif engine == "dask": + client = engine_kwargs.get("client", None) + assert client is not None, "For dask engine you have to provide : client = dask.distributed.Client(...)" + + tasks = [] + for kwargs in job_list: + task = client.submit(run_sorter, **kwargs) + tasks.append(task) + + for task in tasks: + task.result() + + elif engine == "slurm": + # generate python script for slurm + tmp_script_folder = engine_kwargs.get("tmp_script_folder", None) + if tmp_script_folder is None: + tmp_script_folder = tempfile.mkdtemp(prefix="spikeinterface_slurm_") + tmp_script_folder = Path(tmp_script_folder) + cpus_per_task = engine_kwargs.get("cpus_per_task", 1) + mem = engine_kwargs.get("mem", "1G") + + tmp_script_folder.mkdir(exist_ok=True, parents=True) + + # for i, task_args in enumerate(task_args_list): + for i, kwargs in enumerate(job_list): + script_name = tmp_script_folder / f"si_script_{i}.py" + with open(script_name, "w") as f: + kwargs_txt = "" + for k, v in kwargs.items(): + print(k, v) + kwargs_txt += " " + if k == "recording": + # put None temporally + kwargs_txt += "recording=None" + else: + if isinstance(v, str): + kwargs_txt += f'{k}="{v}"' + elif isinstance(v, Path): + kwargs_txt += f'{k}="{str(v.absolute())}"' + else: + kwargs_txt += f"{k}={v}" + kwargs_txt += ",\n" + + # recording_dict = task_args[1] + recording_dict = kwargs["recording"].to_dict() + slurm_script = _slurm_script.format( + python=sys.executable, recording_dict=recording_dict, kwargs_txt=kwargs_txt + ) + print(slurm_script) + f.write(slurm_script) + os.fchmod(f.fileno(), mode=stat.S_IRWXU) + + # subprocess.Popen(["sbatch", str(script_name.absolute()), f"-cpus-per-task={cpus_per_task}", f"-mem={mem}"]) + + return out + +_slurm_script = """#! {python} +from numpy import array +from spikeinterface.sorters import run_sorter + +rec_dict = {recording_dict} + +kwargs = dict( +{kwargs_txt} +) +kwargs['recording'] = load_extactor(rec_dict) + +run_sorter(**kwargs) +""" -_implemented_engine = ("loop", "joblib", "dask", "slurm") def run_sorter_by_property( @@ -66,7 +159,7 @@ def run_sorter_by_property( recording, grouping_property, working_folder, - mode_if_folder_exists="raise", + mode_if_folder_exists=None, engine="loop", engine_kwargs={}, verbose=False, @@ -93,11 +186,10 @@ def run_sorter_by_property( Property to split by before sorting working_folder: str The working directory. - mode_if_folder_exists: {'raise', 'overwrite', 'keep'} - The mode when the subfolder of recording/sorter already exists. - * 'raise' : raise error if subfolder exists - * 'overwrite' : delete and force recompute - * 'keep' : do not compute again if f=subfolder exists and log is OK + mode_if_folder_exists: None + Must be None. This is deprecated. + If not None then a warning is raise. + Will be removed in next release. engine: {'loop', 'joblib', 'dask'} Which engine to use to run sorter. engine_kwargs: dict @@ -127,46 +219,50 @@ def run_sorter_by_property( engine_kwargs={"n_jobs": 4}) """ + if mode_if_folder_exists is not None: + warnings.warn( + "run_sorter_by_property(): mode_if_folder_exists is not used anymore", + DeprecationWarning, + stacklevel=2, + ) + + working_folder = Path(working_folder).absolute() assert grouping_property in recording.get_property_keys(), ( f"The 'grouping_property' {grouping_property} is not " f"a recording property!" ) recording_dict = recording.split_by(grouping_property) - sorting_output = run_sorters( - [sorter_name], - recording_dict, - working_folder, - mode_if_folder_exists=mode_if_folder_exists, - engine=engine, - engine_kwargs=engine_kwargs, - verbose=verbose, - with_output=True, - docker_images={sorter_name: docker_image}, - singularity_images={sorter_name: singularity_image}, - sorter_params={sorter_name: sorter_params}, - ) - - grouping_property_values = None - sorting_list = [] - for output_name, sorting in sorting_output.items(): - prop_name, sorter_name = output_name - sorting_list.append(sorting) - if grouping_property_values is None: - grouping_property_values = np.array( - [prop_name] * len(sorting.get_unit_ids()), dtype=np.dtype(type(prop_name)) - ) - else: - grouping_property_values = np.concatenate( - (grouping_property_values, [prop_name] * len(sorting.get_unit_ids())) - ) + + job_list = [] + for k, rec in recording_dict.items(): + job = dict( + sorter_name=sorter_name, + recording=rec, + output_folder=working_folder / str(k), + verbose=verbose, + docker_image=docker_image, + singularity_image=singularity_image, + **sorter_params + ) + job_list.append(job) + + sorting_list = run_sorter_jobs(job_list, engine=engine, engine_kwargs=engine_kwargs, return_output=True) + + unit_groups = [] + for sorting, group in zip(sorting_list, recording_dict.keys()): + num_units = sorting.get_unit_ids().size + unit_groups.extend([group] * num_units) + unit_groups = np.array(unit_groups) aggregate_sorting = aggregate_units(sorting_list) - aggregate_sorting.set_property(key=grouping_property, values=grouping_property_values) + aggregate_sorting.set_property(key=grouping_property, values=unit_groups) aggregate_sorting.register_recording(recording) return aggregate_sorting + +# This is deprecated and will be removed def run_sorters( sorter_list, recording_dict_or_list, @@ -180,8 +276,10 @@ def run_sorters( docker_images={}, singularity_images={}, ): - """Run several sorter on several recordings. - + """ + This function is deprecated and will be removed. + Please use run_sorter_jobs() instead. + Parameters ---------- sorter_list: list of str @@ -221,6 +319,13 @@ def run_sorters( results : dict The output is nested dict[(rec_name, sorter_name)] of SortingExtractor. """ + + warnings.warn( + "run_sorters()is deprecated please use run_sorter_jobs() instead", + DeprecationWarning, + stacklevel=2, + ) + working_folder = Path(working_folder) mode_if_folder_exists in ("raise", "keep", "overwrite") @@ -247,8 +352,7 @@ def run_sorters( dtype_rec_name = np.dtype(type(list(recording_dict.keys())[0])) assert dtype_rec_name.kind in ("i", "u", "S", "U"), "Dict keys can only be integers or strings!" - need_dump = engine != "loop" - task_args_list = [] + job_list = [] for rec_name, recording in recording_dict.items(): for sorter_name in sorter_list: output_folder = working_folder / str(rec_name) / sorter_name @@ -260,6 +364,7 @@ def run_sorters( elif mode_if_folder_exists == "overwrite": shutil.rmtree(str(output_folder)) elif mode_if_folder_exists == "keep": + if is_log_ok(output_folder): continue else: @@ -268,181 +373,22 @@ def run_sorters( params = sorter_params.get(sorter_name, {}) docker_image = docker_images.get(sorter_name, None) singularity_image = singularity_images.get(sorter_name, None) - _check_container_images(docker_image, singularity_image, sorter_name) - - if need_dump: - if not recording.check_if_dumpable(): - raise Exception("recording not dumpable call recording.save() before") - recording_arg = recording.to_dict(recursive=True) - else: - recording_arg = recording - - task_args = ( - sorter_name, - recording_arg, - output_folder, - verbose, - params, - docker_image, - singularity_image, - with_output, - ) - task_args_list.append(task_args) - if engine == "loop": - # simple loop in main process - for task_args in task_args_list: - _run_one(task_args) - - elif engine == "joblib": - from joblib import Parallel, delayed - - n_jobs = engine_kwargs.get("n_jobs", -1) - backend = engine_kwargs.get("backend", "loky") - Parallel(n_jobs=n_jobs, backend=backend)(delayed(_run_one)(task_args) for task_args in task_args_list) - - elif engine == "dask": - client = engine_kwargs.get("client", None) - assert client is not None, "For dask engine you have to provide : client = dask.distributed.Client(...)" - - tasks = [] - for task_args in task_args_list: - task = client.submit(_run_one, task_args) - tasks.append(task) - - for task in tasks: - task.result() - - elif engine == "slurm": - # generate python script for slurm - tmp_script_folder = engine_kwargs.get("tmp_script_folder", None) - if tmp_script_folder is None: - tmp_script_folder = tempfile.mkdtemp(prefix="spikeinterface_slurm_") - tmp_script_folder = Path(tmp_script_folder) - cpus_per_task = engine_kwargs.get("cpus_per_task", 1) - mem = engine_kwargs.get("mem", "1G") - - for i, task_args in enumerate(task_args_list): - script_name = tmp_script_folder / f"si_script_{i}.py" - with open(script_name, "w") as f: - arg_list_txt = "(\n" - for j, arg in enumerate(task_args): - arg_list_txt += "\t" - if j != 1: - if isinstance(arg, str): - arg_list_txt += f'"{arg}"' - elif isinstance(arg, Path): - arg_list_txt += f'"{str(arg.absolute())}"' - else: - arg_list_txt += f"{arg}" - else: - arg_list_txt += "recording" - arg_list_txt += ",\r" - arg_list_txt += ")" - - recording_dict = task_args[1] - slurm_script = _slurm_script.format( - python=sys.executable, recording_dict=recording_dict, arg_list_txt=arg_list_txt - ) - f.write(slurm_script) - os.fchmod(f.fileno(), mode=stat.S_IRWXU) - - print(slurm_script) - - subprocess.Popen(["sbatch", str(script_name.absolute()), f"-cpus-per-task={cpus_per_task}", f"-mem={mem}"]) - - non_blocking_engine = ("loop", "joblib") - if engine in non_blocking_engine: - # dump spikeinterface_job.json - # only for non blocking engine - for rec_name, recording in recording_dict.items(): - for sorter_name in sorter_list: - output_folder = working_folder / str(rec_name) / sorter_name - with open(output_folder / "spikeinterface_job.json", "w") as f: - dump_dict = {"rec_name": rec_name, "sorter_name": sorter_name, "engine": engine} - if engine != "dask": - dump_dict.update({"engine_kwargs": engine_kwargs}) - json.dump(check_json(dump_dict), f) - - if with_output: - if engine not in non_blocking_engine: - print( - f'Warning!! With engine="{engine}" you cannot have directly output results\n' - "Use : run_sorters(..., with_output=False)\n" - "And then: results = collect_sorting_outputs(output_folders)" + job = dict( + sorter_name=sorter_name, + recording=recording, + output_folder=output_folder, + verbose=verbose, + docker_image=docker_image, + singularity_image=singularity_image, + **params ) - return + job_list.append(job) + + sorting_list = run_sorter_jobs(job_list, engine=engine, engine_kwargs=engine_kwargs, return_output=with_output) - results = collect_sorting_outputs(working_folder) + if with_output: + keys = [(rec_name, sorter_name) for rec_name in recording_dict for sorter_name in sorter_list ] + results = dict(zip(keys, sorting_list)) return results - -_slurm_script = """#! {python} -from numpy import array -from spikeinterface.sorters.launcher import _run_one - -recording = {recording_dict} - -arg_list = {arg_list_txt} - -_run_one(arg_list) -""" - - -def is_log_ok(output_folder): - # log is OK when run_time is not None - if (output_folder / "spikeinterface_log.json").is_file(): - with open(output_folder / "spikeinterface_log.json", mode="r", encoding="utf8") as logfile: - log = json.load(logfile) - run_time = log.get("run_time", None) - ok = run_time is not None - return ok - return False - - -def iter_working_folder(working_folder): - working_folder = Path(working_folder) - for rec_folder in working_folder.iterdir(): - if not rec_folder.is_dir(): - continue - for output_folder in rec_folder.iterdir(): - if (output_folder / "spikeinterface_job.json").is_file(): - with open(output_folder / "spikeinterface_job.json", "r") as f: - job_dict = json.load(f) - rec_name = job_dict["rec_name"] - sorter_name = job_dict["sorter_name"] - yield rec_name, sorter_name, output_folder - else: - rec_name = rec_folder.name - sorter_name = output_folder.name - if not output_folder.is_dir(): - continue - if not is_log_ok(output_folder): - continue - yield rec_name, sorter_name, output_folder - - -def iter_sorting_output(working_folder): - """Iterator over output_folder to retrieve all triplets of (rec_name, sorter_name, sorting).""" - for rec_name, sorter_name, output_folder in iter_working_folder(working_folder): - SorterClass = sorter_dict[sorter_name] - sorting = SorterClass.get_result_from_folder(output_folder) - yield rec_name, sorter_name, sorting - - -def collect_sorting_outputs(working_folder): - """Collect results in a working_folder. - - The output is a dict with double key access results[(rec_name, sorter_name)] of SortingExtractor. - """ - results = {} - for rec_name, sorter_name, sorting in iter_sorting_output(working_folder): - results[(rec_name, sorter_name)] = sorting - return results - - -def _check_container_images(docker_image, singularity_image, sorter_name): - if docker_image is not None: - assert singularity_image is None, f"Provide either a docker or a singularity image " f"for sorter {sorter_name}" - if singularity_image is not None: - assert docker_image is None, f"Provide either a docker or a singularity image " f"for sorter {sorter_name}" diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index cd8bc0fa5d..0d84dc0bdb 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -1,4 +1,5 @@ import os +import sys import shutil import time @@ -6,8 +7,9 @@ from pathlib import Path from spikeinterface.core import load_extractor -from spikeinterface.extractors import toy_example -from spikeinterface.sorters import run_sorters, run_sorter_by_property, collect_sorting_outputs +# from spikeinterface.extractors import toy_example +from spikeinterface import generate_ground_truth_recording +from spikeinterface.sorters import run_sorter_jobs, run_sorters, run_sorter_by_property if hasattr(pytest, "global_test_folder"): @@ -15,10 +17,16 @@ else: cache_folder = Path("cache_folder") / "sorters" +base_output = cache_folder / 'sorter_output' + +# no need to have many +num_recordings = 2 +sorters = ["tridesclous2"] def setup_module(): - rec, _ = toy_example(num_channels=8, duration=30, seed=0, num_segments=1) - for i in range(4): + base_seed = 42 + for i in range(num_recordings): + rec, _ = generate_ground_truth_recording(num_channels=8, durations=[10.0], seed=base_seed + i) rec_folder = cache_folder / f"toy_rec_{i}" if rec_folder.is_dir(): shutil.rmtree(rec_folder) @@ -31,19 +39,101 @@ def setup_module(): rec.save(folder=rec_folder) -def test_run_sorters_with_list(): - working_folder = cache_folder / "test_run_sorters_list" +def get_job_list(): + jobs = [] + for i in range(num_recordings): + for sorter_name in sorters: + recording = load_extractor(cache_folder / f"toy_rec_{i}") + kwargs = dict(sorter_name=sorter_name, + recording=recording, + output_folder=base_output / f"{sorter_name}_rec{i}", + verbose=True, + raise_error=False, + ) + jobs.append(kwargs) + + return jobs + +@pytest.fixture(scope="module") +def job_list(): + return get_job_list() + + + + + + + +################################ + + +def test_run_sorter_jobs_loop(job_list): + if base_output.is_dir(): + shutil.rmtree(base_output) + sortings = run_sorter_jobs(job_list, engine="loop", return_output=True) + print(sortings) + + +def test_run_sorter_jobs_joblib(job_list): + if base_output.is_dir(): + shutil.rmtree(base_output) + sortings = run_sorter_jobs(job_list, engine="joblib", engine_kwargs=dict(n_jobs=2, backend="loky"), return_output=True) + print(sortings) + +def test_run_sorter_jobs_multiprocessing(job_list): + pass + +@pytest.mark.skipif(True, reason="This is tested locally") +def test_run_sorter_jobs_dask(job_list): + if base_output.is_dir(): + shutil.rmtree(base_output) + + # create a dask Client for a slurm queue + from dask.distributed import Client + + test_mode = "local" + # test_mode = "client_slurm" + + if test_mode == "local": + client = Client() + elif test_mode == "client_slurm": + from dask_jobqueue import SLURMCluster + cluster = SLURMCluster( + processes=1, + cores=1, + memory="12GB", + python=sys.executable, + walltime="12:00:00", + ) + cluster.scale(2) + client = Client(cluster) + + # dask + t0 = time.perf_counter() + run_sorter_jobs(job_list, engine="dask", engine_kwargs=dict(client=client)) + t1 = time.perf_counter() + print(t1 - t0) + + +def test_run_sorter_jobs_slurm(job_list): + if base_output.is_dir(): + shutil.rmtree(base_output) + + working_folder = cache_folder / "test_run_sorters_slurm" if working_folder.is_dir(): shutil.rmtree(working_folder) - # make dumpable - rec0 = load_extractor(cache_folder / "toy_rec_0") - rec1 = load_extractor(cache_folder / "toy_rec_1") - - recording_list = [rec0, rec1] - sorter_list = ["tridesclous"] + tmp_script_folder = working_folder / "slurm_scripts" - run_sorters(sorter_list, recording_list, working_folder, engine="loop", verbose=False, with_output=False) + run_sorter_jobs( + job_list, + engine="slurm", + engine_kwargs=dict( + tmp_script_folder=tmp_script_folder, + cpus_per_task=32, + mem="32G", + ) + ) def test_run_sorter_by_property(): @@ -59,7 +149,7 @@ def test_run_sorter_by_property(): rec0_by = rec0.split_by("group") group_names0 = list(rec0_by.keys()) - sorter_name = "tridesclous" + sorter_name = "tridesclous2" sorting0 = run_sorter_by_property(sorter_name, rec0, "group", working_folder1, engine="loop", verbose=False) assert "group" in sorting0.get_property_keys() assert all([g in group_names0 for g in sorting0.get_property("group")]) @@ -68,13 +158,38 @@ def test_run_sorter_by_property(): rec1_by = rec1.split_by("group") group_names1 = list(rec1_by.keys()) - sorter_name = "tridesclous" + sorter_name = "tridesclous2" sorting1 = run_sorter_by_property(sorter_name, rec1, "group", working_folder2, engine="loop", verbose=False) assert "group" in sorting1.get_property_keys() assert all([g in group_names1 for g in sorting1.get_property("group")]) + +# run_sorters is deprecated +# This will test will be removed in next release +def test_run_sorters_with_list(): + + + working_folder = cache_folder / "test_run_sorters_list" + if working_folder.is_dir(): + shutil.rmtree(working_folder) + + # make dumpable + rec0 = load_extractor(cache_folder / "toy_rec_0") + rec1 = load_extractor(cache_folder / "toy_rec_1") + + recording_list = [rec0, rec1] + sorter_list = ["tridesclous2"] + + run_sorters(sorter_list, recording_list, working_folder, engine="loop", verbose=False, with_output=False) + + + + +# run_sorters is deprecated +# This will test will be removed in next release def test_run_sorters_with_dict(): + working_folder = cache_folder / "test_run_sorters_dict" if working_folder.is_dir(): shutil.rmtree(working_folder) @@ -84,9 +199,9 @@ def test_run_sorters_with_dict(): recording_dict = {"toy_tetrode": rec0, "toy_octotrode": rec1} - sorter_list = ["tridesclous", "tridesclous2"] + sorter_list = ["tridesclous2"] - sorter_params = {"tridesclous": dict(detect_threshold=5.6), "tridesclous2": dict()} + sorter_params = {"tridesclous2": dict()} # simple loop t0 = time.perf_counter() @@ -116,143 +231,23 @@ def test_run_sorters_with_dict(): ) -@pytest.mark.skipif(True, reason="This is tested locally") -def test_run_sorters_joblib(): - working_folder = cache_folder / "test_run_sorters_joblib" - if working_folder.is_dir(): - shutil.rmtree(working_folder) - - recording_dict = {} - for i in range(4): - rec = load_extractor(cache_folder / f"toy_rec_{i}") - recording_dict[f"rec_{i}"] = rec - - sorter_list = [ - "tridesclous", - ] - - # joblib - t0 = time.perf_counter() - run_sorters( - sorter_list, - recording_dict, - working_folder / "with_joblib", - engine="joblib", - engine_kwargs={"n_jobs": 4}, - with_output=False, - mode_if_folder_exists="keep", - ) - t1 = time.perf_counter() - print(t1 - t0) - - -@pytest.mark.skipif(True, reason="This is tested locally") -def test_run_sorters_dask(): - working_folder = cache_folder / "test_run_sorters_dask" - if working_folder.is_dir(): - shutil.rmtree(working_folder) - - recording_dict = {} - for i in range(4): - rec = load_extractor(cache_folder / f"toy_rec_{i}") - recording_dict[f"rec_{i}"] = rec - sorter_list = [ - "tridesclous", - ] - - # create a dask Client for a slurm queue - from dask.distributed import Client - from dask_jobqueue import SLURMCluster - - python = "/home/samuel.garcia/.virtualenvs/py36/bin/python3.6" - cluster = SLURMCluster( - processes=1, - cores=1, - memory="12GB", - python=python, - walltime="12:00:00", - ) - cluster.scale(5) - client = Client(cluster) - - # dask - t0 = time.perf_counter() - run_sorters( - sorter_list, - recording_dict, - working_folder, - engine="dask", - engine_kwargs={"client": client}, - with_output=False, - mode_if_folder_exists="keep", - ) - t1 = time.perf_counter() - print(t1 - t0) - - -@pytest.mark.skipif(True, reason="This is tested locally") -def test_run_sorters_slurm(): - working_folder = cache_folder / "test_run_sorters_slurm" - if working_folder.is_dir(): - shutil.rmtree(working_folder) - - # create recording - recording_dict = {} - for i in range(4): - rec = load_extractor(cache_folder / f"toy_rec_{i}") - recording_dict[f"rec_{i}"] = rec - - sorter_list = [ - "spykingcircus2", - "tridesclous2", - ] - - tmp_script_folder = working_folder / "slurm_scripts" - tmp_script_folder.mkdir(parents=True) - - run_sorters( - sorter_list, - recording_dict, - working_folder, - engine="slurm", - engine_kwargs={ - "tmp_script_folder": tmp_script_folder, - "cpus_per_task": 32, - "mem": "32G", - }, - with_output=False, - mode_if_folder_exists="keep", - verbose=True, - ) - - -def test_collect_sorting_outputs(): - working_folder = cache_folder / "test_run_sorters_dict" - results = collect_sorting_outputs(working_folder) - print(results) - - -def test_sorter_installation(): - # This import is to get error on github when import fails - import tridesclous - - # import circus if __name__ == "__main__": setup_module() - # pass - # test_run_sorters_with_list() + job_list = get_job_list() + + # test_run_sorter_jobs_loop(job_list) + # test_run_sorter_jobs_joblib(job_list) + # test_run_sorter_jobs_multiprocessing(job_list) + # test_run_sorter_jobs_dask(job_list) + # test_run_sorter_jobs_slurm(job_list) # test_run_sorter_by_property() + # this deprecated + test_run_sorters_with_list() test_run_sorters_with_dict() - # test_run_sorters_joblib() - - # test_run_sorters_dask() - - # test_run_sorters_slurm() - # test_collect_sorting_outputs() From 67dc176ec3305154adc7e0ce21b38b466c0fcd0b Mon Sep 17 00:00:00 2001 From: Garcia Samuel Date: Wed, 6 Sep 2023 15:12:06 +0200 Subject: [PATCH 02/12] Update doc/modules/sorters.rst Co-authored-by: Zach McKenzie <92116279+zm711@users.noreply.github.com> --- doc/modules/sorters.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/modules/sorters.rst b/doc/modules/sorters.rst index ad50f9e411..1843e80b8c 100644 --- a/doc/modules/sorters.rst +++ b/doc/modules/sorters.rst @@ -290,7 +290,7 @@ an :code:`engine` that supports parallel processing (such as :code:`joblib` or : .. code-block:: python - # here we run 2 sorters on 2 diffrents recording = 4 jobs + # here we run 2 sorters on 2 different recordings = 4 jobs recording = ... another_recording = ... From fe5052818fa4ddaed3f0e21fd657c9fe4151f988 Mon Sep 17 00:00:00 2001 From: Samuel Garcia Date: Wed, 6 Sep 2023 15:32:30 +0200 Subject: [PATCH 03/12] add engine="processpoolexecutor" --- src/spikeinterface/sorters/launcher.py | 56 ++++++++++++++----- .../sorters/tests/test_launcher.py | 27 ++++----- 2 files changed, 55 insertions(+), 28 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 138b4c5848..60be6e1286 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -4,7 +4,6 @@ from pathlib import Path import shutil import numpy as np -import json import tempfile import os import stat @@ -12,14 +11,22 @@ import sys import warnings -from spikeinterface.core import load_extractor, aggregate_units -from spikeinterface.core.core_tools import check_json +from spikeinterface.core import aggregate_units from .sorterlist import sorter_dict from .runsorter import run_sorter from .basesorter import is_log_ok -_implemented_engine = ("loop", "joblib", "dask", "slurm") +_default_engine_kwargs = dict( + loop=dict(), + joblib=dict(n_jobs=-1, backend="loky"), + processpoolexecutor=dict(max_workers=2, mp_context=None), + dask=dict(client=None), + slurm=dict(tmp_script_folder=None, cpus_per_task=1, mem="1G"), +) + + +_implemented_engine = list(_default_engine_kwargs.keys()) def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=False): """ @@ -56,8 +63,15 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal assert engine in _implemented_engine, f"engine must be in {_implemented_engine}" + engine_kwargs_ = dict() + engine_kwargs_.update(_default_engine_kwargs[engine]) + engine_kwargs_.update(engine_kwargs) + engine_kwargs = engine_kwargs_ + + + if return_output: - assert engine in ("loop", "joblib", "multiprocessing") + assert engine in ("loop", "joblib", "processpoolexecutor") out = [] else: out = None @@ -72,17 +86,30 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal elif engine == "joblib": from joblib import Parallel, delayed - n_jobs = engine_kwargs.get("n_jobs", -1) - backend = engine_kwargs.get("backend", "loky") + n_jobs = engine_kwargs["n_jobs"] + backend = engine_kwargs["backend"] sortings = Parallel(n_jobs=n_jobs, backend=backend)(delayed(run_sorter)(**kwargs) for kwargs in job_list) if return_output: out.extend(sortings) - elif engine == "multiprocessing": - raise NotImplementedError() + elif engine == "processpoolexecutor": + from concurrent.futures import ProcessPoolExecutor + + max_workers = engine_kwargs["max_workers"] + mp_context = engine_kwargs["mp_context"] + + with ProcessPoolExecutor(max_workers=max_workers, mp_context=mp_context) as executor: + futures = [] + for kwargs in job_list: + res = executor.submit(run_sorter, **kwargs) + futures.append(res) + for futur in futures: + sorting = futur.result() + if return_output: + out.append(sorting) elif engine == "dask": - client = engine_kwargs.get("client", None) + client = engine_kwargs["client"] assert client is not None, "For dask engine you have to provide : client = dask.distributed.Client(...)" tasks = [] @@ -95,16 +122,15 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal elif engine == "slurm": # generate python script for slurm - tmp_script_folder = engine_kwargs.get("tmp_script_folder", None) + tmp_script_folder = engine_kwargs["tmp_script_folder"] if tmp_script_folder is None: tmp_script_folder = tempfile.mkdtemp(prefix="spikeinterface_slurm_") tmp_script_folder = Path(tmp_script_folder) - cpus_per_task = engine_kwargs.get("cpus_per_task", 1) - mem = engine_kwargs.get("mem", "1G") + cpus_per_task = engine_kwargs["cpus_per_task"] + mem = engine_kwargs["mem"] tmp_script_folder.mkdir(exist_ok=True, parents=True) - # for i, task_args in enumerate(task_args_list): for i, kwargs in enumerate(job_list): script_name = tmp_script_folder / f"si_script_{i}.py" with open(script_name, "w") as f: @@ -133,7 +159,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal f.write(slurm_script) os.fchmod(f.fileno(), mode=stat.S_IRWXU) - # subprocess.Popen(["sbatch", str(script_name.absolute()), f"-cpus-per-task={cpus_per_task}", f"-mem={mem}"]) + subprocess.Popen(["sbatch", str(script_name.absolute()), f"-cpus-per-task={cpus_per_task}", f"-mem={mem}"]) return out diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index 0d84dc0bdb..c1f8b6e0bb 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -59,14 +59,6 @@ def job_list(): return get_job_list() - - - - - -################################ - - def test_run_sorter_jobs_loop(job_list): if base_output.is_dir(): shutil.rmtree(base_output) @@ -74,14 +66,22 @@ def test_run_sorter_jobs_loop(job_list): print(sortings) + + def test_run_sorter_jobs_joblib(job_list): if base_output.is_dir(): shutil.rmtree(base_output) sortings = run_sorter_jobs(job_list, engine="joblib", engine_kwargs=dict(n_jobs=2, backend="loky"), return_output=True) print(sortings) -def test_run_sorter_jobs_multiprocessing(job_list): - pass +def test_run_sorter_jobs_processpoolexecutor(job_list): + if base_output.is_dir(): + shutil.rmtree(base_output) + sortings = run_sorter_jobs(job_list, engine="processpoolexecutor", engine_kwargs=dict(max_workers=2), return_output=True) + print(sortings) + + + @pytest.mark.skipif(True, reason="This is tested locally") def test_run_sorter_jobs_dask(job_list): @@ -235,11 +235,12 @@ def test_run_sorters_with_dict(): if __name__ == "__main__": - setup_module() + # setup_module() job_list = get_job_list() # test_run_sorter_jobs_loop(job_list) # test_run_sorter_jobs_joblib(job_list) + test_run_sorter_jobs_processpoolexecutor(job_list) # test_run_sorter_jobs_multiprocessing(job_list) # test_run_sorter_jobs_dask(job_list) # test_run_sorter_jobs_slurm(job_list) @@ -247,7 +248,7 @@ def test_run_sorters_with_dict(): # test_run_sorter_by_property() # this deprecated - test_run_sorters_with_list() - test_run_sorters_with_dict() + # test_run_sorters_with_list() + # test_run_sorters_with_dict() From f4b7c3caad2011606bf19a70c69d098a3922f277 Mon Sep 17 00:00:00 2001 From: Samuel Garcia Date: Wed, 6 Sep 2023 16:29:17 +0200 Subject: [PATCH 04/12] debug slurm launcher --- src/spikeinterface/sorters/launcher.py | 5 ++--- src/spikeinterface/sorters/tests/test_launcher.py | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 60be6e1286..6f3b972fdd 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -136,7 +136,6 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal with open(script_name, "w") as f: kwargs_txt = "" for k, v in kwargs.items(): - print(k, v) kwargs_txt += " " if k == "recording": # put None temporally @@ -155,7 +154,6 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal slurm_script = _slurm_script.format( python=sys.executable, recording_dict=recording_dict, kwargs_txt=kwargs_txt ) - print(slurm_script) f.write(slurm_script) os.fchmod(f.fileno(), mode=stat.S_IRWXU) @@ -165,6 +163,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal _slurm_script = """#! {python} from numpy import array +from spikeinterface import load_extractor from spikeinterface.sorters import run_sorter rec_dict = {recording_dict} @@ -172,7 +171,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal kwargs = dict( {kwargs_txt} ) -kwargs['recording'] = load_extactor(rec_dict) +kwargs['recording'] = load_extractor(rec_dict) run_sorter(**kwargs) """ diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index c1f8b6e0bb..2d8e6f3d3c 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -240,10 +240,10 @@ def test_run_sorters_with_dict(): # test_run_sorter_jobs_loop(job_list) # test_run_sorter_jobs_joblib(job_list) - test_run_sorter_jobs_processpoolexecutor(job_list) + # test_run_sorter_jobs_processpoolexecutor(job_list) # test_run_sorter_jobs_multiprocessing(job_list) # test_run_sorter_jobs_dask(job_list) - # test_run_sorter_jobs_slurm(job_list) + test_run_sorter_jobs_slurm(job_list) # test_run_sorter_by_property() From 93de4db5596a7c4ff3cc2925f6c702a9cabf7703 Mon Sep 17 00:00:00 2001 From: Garcia Samuel Date: Wed, 6 Sep 2023 16:25:42 +0200 Subject: [PATCH 05/12] Update doc/modules/sorters.rst Co-authored-by: Alessio Buccino --- doc/modules/sorters.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/doc/modules/sorters.rst b/doc/modules/sorters.rst index 1843e80b8c..d17927cc42 100644 --- a/doc/modules/sorters.rst +++ b/doc/modules/sorters.rst @@ -295,10 +295,10 @@ an :code:`engine` that supports parallel processing (such as :code:`joblib` or : another_recording = ... job_list = [ - {'sorter_name': 'tridesclous', 'recording': recording, 'output_folder': '/folder1','detect_threshold': 5.}, - {'sorter_name': 'tridesclous', 'recording': another_recording, 'output_folder': '/folder2', 'detect_threshold': 5.}, - {'sorter_name': 'herdingspikes', 'recording': recording, 'output_folder': '/folder3', 'clustering_bandwidth': 8., 'docker_image': True}, - {'sorter_name': 'herdingspikes', 'recording': another_recording, 'output_folder': '/folder4', 'clustering_bandwidth': 8., 'docker_image': True}, + {'sorter_name': 'tridesclous', 'recording': recording, 'output_folder': 'folder1','detect_threshold': 5.}, + {'sorter_name': 'tridesclous', 'recording': another_recording, 'output_folder': 'folder2', 'detect_threshold': 5.}, + {'sorter_name': 'herdingspikes', 'recording': recording, 'output_folder': 'folder3', 'clustering_bandwidth': 8., 'docker_image': True}, + {'sorter_name': 'herdingspikes', 'recording': another_recording, 'output_folder': 'folder4', 'clustering_bandwidth': 8., 'docker_image': True}, ] # run in loop From fe2d7c532611add92ea46d877f051a396ead6ced Mon Sep 17 00:00:00 2001 From: Garcia Samuel Date: Thu, 7 Sep 2023 19:08:32 +0200 Subject: [PATCH 06/12] Suggestions from Zach Co-authored-by: Zach McKenzie <92116279+zm711@users.noreply.github.com> --- src/spikeinterface/sorters/launcher.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 6f3b972fdd..103f30dac5 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -30,7 +30,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=False): """ - Run several :py:func:`run_sorter()` sequencially or in parralel given a list of job. + Run several :py:func:`run_sorter()` sequentially or in parallel given a list of jobs. For **engine="loop"** this is equivalent to: @@ -39,9 +39,9 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal for job in job_list: run_sorter(**job) - For some engines, this function is blocking until the results ("loop", "joblib", "multiprocessing", "dask"). - For some other engine ("slurm") the function return almost immediatly (akak non blocking) and the results - must be retrieve by hand when finished with :py:func:`read_sorter_folder()`. + For some engines ("loop", "joblib", "multiprocessing", "dask"), this function is blocking until the results . + For other engines ("slurm") the function returns almost immediately (aka non-blocking) and the results + must be retrieved by hand when finished with :py:func:`read_sorter_folder()`. Parameters ---------- From 6a364b03e3c6504969a9bdcf7eecf6885384ddb3 Mon Sep 17 00:00:00 2001 From: Samuel Garcia Date: Tue, 12 Sep 2023 10:28:55 +0200 Subject: [PATCH 07/12] feedback from Zacha dn Ramon --- src/spikeinterface/sorters/launcher.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index 103f30dac5..b158eba22d 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -39,9 +39,21 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal for job in job_list: run_sorter(**job) - For some engines ("loop", "joblib", "multiprocessing", "dask"), this function is blocking until the results . - For other engines ("slurm") the function returns almost immediately (aka non-blocking) and the results - must be retrieved by hand when finished with :py:func:`read_sorter_folder()`. + The following engines block the I/O: + * "loop" + * "joblib" + * "multiprocessing" + * "dask" + + The following engines are *asynchronous*: + * "slurm" + + Where *blocking* means that this function is blocking until the results are returned. + This is in opposition to *asynchronous*, where the function returns `None` almost immediately (aka non-blocking), + but the results must be retrieved by hand when jobs are finished. No mechanisim is provided here to be aware + when jobs are finish. + In this *asynchronous* case, the :py:func:read_sorter_folder() helps to retrieve individual results. + Parameters ---------- @@ -302,7 +314,7 @@ def run_sorters( singularity_images={}, ): """ - This function is deprecated and will be removed. + This function is deprecated and will be removed in version 0.100 Please use run_sorter_jobs() instead. Parameters @@ -346,7 +358,7 @@ def run_sorters( """ warnings.warn( - "run_sorters()is deprecated please use run_sorter_jobs() instead", + "run_sorters()is deprecated please use run_sorter_jobs() instead. This will be removed in 0.100", DeprecationWarning, stacklevel=2, ) From 1b28837a452da62e6890019bcb311cb5ced4009e Mon Sep 17 00:00:00 2001 From: Samuel Garcia Date: Wed, 13 Sep 2023 13:45:38 +0200 Subject: [PATCH 08/12] skip slurm tests --- src/spikeinterface/sorters/tests/test_launcher.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index 2d8e6f3d3c..ecab64ede6 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -115,6 +115,7 @@ def test_run_sorter_jobs_dask(job_list): print(t1 - t0) +@pytest.mark.skip("Slurm launcher need a machine with slurm") def test_run_sorter_jobs_slurm(job_list): if base_output.is_dir(): shutil.rmtree(base_output) From 7aa96d3a81c685dfb9d242fc5e3057d352c376dd Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 15 Sep 2023 13:31:16 +0000 Subject: [PATCH 09/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/spikeinterface/comparison/studytools.py | 5 +- src/spikeinterface/sorters/basesorter.py | 2 +- src/spikeinterface/sorters/launcher.py | 45 ++++++++--------- .../sorters/tests/test_launcher.py | 49 +++++++++---------- 4 files changed, 44 insertions(+), 57 deletions(-) diff --git a/src/spikeinterface/comparison/studytools.py b/src/spikeinterface/comparison/studytools.py index 00119c1586..26d2c1ad6f 100644 --- a/src/spikeinterface/comparison/studytools.py +++ b/src/spikeinterface/comparison/studytools.py @@ -29,9 +29,6 @@ from .paircomparisons import compare_sorter_to_ground_truth - - - # This is deprecated and will be removed def iter_working_folder(working_folder): working_folder = Path(working_folder) @@ -54,6 +51,7 @@ def iter_working_folder(working_folder): continue yield rec_name, sorter_name, output_folder + # This is deprecated and will be removed def iter_sorting_output(working_folder): """Iterator over output_folder to retrieve all triplets of (rec_name, sorter_name, sorting).""" @@ -63,7 +61,6 @@ def iter_sorting_output(working_folder): yield rec_name, sorter_name, sorting - def setup_comparison_study(study_folder, gt_dict, **job_kwargs): """ Based on a dict of (recording, sorting) create the study folder. diff --git a/src/spikeinterface/sorters/basesorter.py b/src/spikeinterface/sorters/basesorter.py index aa76809b58..c7581ba1e1 100644 --- a/src/spikeinterface/sorters/basesorter.py +++ b/src/spikeinterface/sorters/basesorter.py @@ -421,4 +421,4 @@ def is_log_ok(output_folder): run_time = log.get("run_time", None) ok = run_time is not None return ok - return False \ No newline at end of file + return False diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index b158eba22d..d04a89fdf1 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -11,7 +11,7 @@ import sys import warnings -from spikeinterface.core import aggregate_units +from spikeinterface.core import aggregate_units from .sorterlist import sorter_dict from .runsorter import run_sorter @@ -28,6 +28,7 @@ _implemented_engine = list(_default_engine_kwargs.keys()) + def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=False): """ Run several :py:func:`run_sorter()` sequentially or in parallel given a list of jobs. @@ -38,18 +39,18 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal for job in job_list: run_sorter(**job) - + The following engines block the I/O: * "loop" * "joblib" * "multiprocessing" * "dask" - + The following engines are *asynchronous*: * "slurm" - + Where *blocking* means that this function is blocking until the results are returned. - This is in opposition to *asynchronous*, where the function returns `None` almost immediately (aka non-blocking), + This is in opposition to *asynchronous*, where the function returns `None` almost immediately (aka non-blocking), but the results must be retrieved by hand when jobs are finished. No mechanisim is provided here to be aware when jobs are finish. In this *asynchronous* case, the :py:func:read_sorter_folder() helps to retrieve individual results. @@ -61,7 +62,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal A list a dict that are propagated to run_sorter(...) engine: str "loop", "joblib", "dask", "slurm" The engine to run the list. - * "loop": a simple loop. This engine is + * "loop": a simple loop. This engine is engine_kwargs: dict return_output: bool, dfault False @@ -79,8 +80,6 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal engine_kwargs_.update(_default_engine_kwargs[engine]) engine_kwargs_.update(engine_kwargs) engine_kwargs = engine_kwargs_ - - if return_output: assert engine in ("loop", "joblib", "processpoolexecutor") @@ -109,7 +108,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal max_workers = engine_kwargs["max_workers"] mp_context = engine_kwargs["mp_context"] - + with ProcessPoolExecutor(max_workers=max_workers, mp_context=mp_context) as executor: futures = [] for kwargs in job_list: @@ -173,6 +172,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal return out + _slurm_script = """#! {python} from numpy import array from spikeinterface import load_extractor @@ -189,8 +189,6 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal """ - - def run_sorter_by_property( sorter_name, recording, @@ -258,10 +256,10 @@ def run_sorter_by_property( """ if mode_if_folder_exists is not None: warnings.warn( - "run_sorter_by_property(): mode_if_folder_exists is not used anymore", - DeprecationWarning, - stacklevel=2, - ) + "run_sorter_by_property(): mode_if_folder_exists is not used anymore", + DeprecationWarning, + stacklevel=2, + ) working_folder = Path(working_folder).absolute() @@ -269,7 +267,7 @@ def run_sorter_by_property( f"The 'grouping_property' {grouping_property} is not " f"a recording property!" ) recording_dict = recording.split_by(grouping_property) - + job_list = [] for k, rec in recording_dict.items(): job = dict( @@ -279,10 +277,10 @@ def run_sorter_by_property( verbose=verbose, docker_image=docker_image, singularity_image=singularity_image, - **sorter_params + **sorter_params, ) job_list.append(job) - + sorting_list = run_sorter_jobs(job_list, engine=engine, engine_kwargs=engine_kwargs, return_output=True) unit_groups = [] @@ -298,7 +296,6 @@ def run_sorter_by_property( return aggregate_sorting - # This is deprecated and will be removed def run_sorters( sorter_list, @@ -316,7 +313,7 @@ def run_sorters( """ This function is deprecated and will be removed in version 0.100 Please use run_sorter_jobs() instead. - + Parameters ---------- sorter_list: list of str @@ -401,7 +398,6 @@ def run_sorters( elif mode_if_folder_exists == "overwrite": shutil.rmtree(str(output_folder)) elif mode_if_folder_exists == "keep": - if is_log_ok(output_folder): continue else: @@ -418,14 +414,13 @@ def run_sorters( verbose=verbose, docker_image=docker_image, singularity_image=singularity_image, - **params + **params, ) job_list.append(job) - + sorting_list = run_sorter_jobs(job_list, engine=engine, engine_kwargs=engine_kwargs, return_output=with_output) if with_output: - keys = [(rec_name, sorter_name) for rec_name in recording_dict for sorter_name in sorter_list ] + keys = [(rec_name, sorter_name) for rec_name in recording_dict for sorter_name in sorter_list] results = dict(zip(keys, sorting_list)) return results - diff --git a/src/spikeinterface/sorters/tests/test_launcher.py b/src/spikeinterface/sorters/tests/test_launcher.py index ecab64ede6..14c938f8ba 100644 --- a/src/spikeinterface/sorters/tests/test_launcher.py +++ b/src/spikeinterface/sorters/tests/test_launcher.py @@ -7,6 +7,7 @@ from pathlib import Path from spikeinterface.core import load_extractor + # from spikeinterface.extractors import toy_example from spikeinterface import generate_ground_truth_recording from spikeinterface.sorters import run_sorter_jobs, run_sorters, run_sorter_by_property @@ -17,12 +18,13 @@ else: cache_folder = Path("cache_folder") / "sorters" -base_output = cache_folder / 'sorter_output' +base_output = cache_folder / "sorter_output" # no need to have many num_recordings = 2 sorters = ["tridesclous2"] + def setup_module(): base_seed = 42 for i in range(num_recordings): @@ -44,16 +46,18 @@ def get_job_list(): for i in range(num_recordings): for sorter_name in sorters: recording = load_extractor(cache_folder / f"toy_rec_{i}") - kwargs = dict(sorter_name=sorter_name, - recording=recording, - output_folder=base_output / f"{sorter_name}_rec{i}", - verbose=True, - raise_error=False, - ) + kwargs = dict( + sorter_name=sorter_name, + recording=recording, + output_folder=base_output / f"{sorter_name}_rec{i}", + verbose=True, + raise_error=False, + ) jobs.append(kwargs) - + return jobs + @pytest.fixture(scope="module") def job_list(): return get_job_list() @@ -66,23 +70,24 @@ def test_run_sorter_jobs_loop(job_list): print(sortings) - - def test_run_sorter_jobs_joblib(job_list): if base_output.is_dir(): shutil.rmtree(base_output) - sortings = run_sorter_jobs(job_list, engine="joblib", engine_kwargs=dict(n_jobs=2, backend="loky"), return_output=True) + sortings = run_sorter_jobs( + job_list, engine="joblib", engine_kwargs=dict(n_jobs=2, backend="loky"), return_output=True + ) print(sortings) + def test_run_sorter_jobs_processpoolexecutor(job_list): if base_output.is_dir(): shutil.rmtree(base_output) - sortings = run_sorter_jobs(job_list, engine="processpoolexecutor", engine_kwargs=dict(max_workers=2), return_output=True) + sortings = run_sorter_jobs( + job_list, engine="processpoolexecutor", engine_kwargs=dict(max_workers=2), return_output=True + ) print(sortings) - - @pytest.mark.skipif(True, reason="This is tested locally") def test_run_sorter_jobs_dask(job_list): if base_output.is_dir(): @@ -92,12 +97,13 @@ def test_run_sorter_jobs_dask(job_list): from dask.distributed import Client test_mode = "local" - # test_mode = "client_slurm" + # test_mode = "client_slurm" if test_mode == "local": client = Client() elif test_mode == "client_slurm": from dask_jobqueue import SLURMCluster + cluster = SLURMCluster( processes=1, cores=1, @@ -133,7 +139,7 @@ def test_run_sorter_jobs_slurm(job_list): tmp_script_folder=tmp_script_folder, cpus_per_task=32, mem="32G", - ) + ), ) @@ -165,12 +171,9 @@ def test_run_sorter_by_property(): assert all([g in group_names1 for g in sorting1.get_property("group")]) - # run_sorters is deprecated # This will test will be removed in next release def test_run_sorters_with_list(): - - working_folder = cache_folder / "test_run_sorters_list" if working_folder.is_dir(): shutil.rmtree(working_folder) @@ -185,12 +188,9 @@ def test_run_sorters_with_list(): run_sorters(sorter_list, recording_list, working_folder, engine="loop", verbose=False, with_output=False) - - # run_sorters is deprecated # This will test will be removed in next release def test_run_sorters_with_dict(): - working_folder = cache_folder / "test_run_sorters_dict" if working_folder.is_dir(): shutil.rmtree(working_folder) @@ -232,9 +232,6 @@ def test_run_sorters_with_dict(): ) - - - if __name__ == "__main__": # setup_module() job_list = get_job_list() @@ -251,5 +248,3 @@ def test_run_sorters_with_dict(): # this deprecated # test_run_sorters_with_list() # test_run_sorters_with_dict() - - From 5c0bdbb546fd121db38cc9c5123360f7534eb94a Mon Sep 17 00:00:00 2001 From: Garcia Samuel Date: Tue, 19 Sep 2023 17:59:48 +0200 Subject: [PATCH 10/12] Suggestions from Alessio Co-authored-by: Alessio Buccino --- src/spikeinterface/sorters/launcher.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index d04a89fdf1..d6506cade5 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -51,9 +51,9 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal Where *blocking* means that this function is blocking until the results are returned. This is in opposition to *asynchronous*, where the function returns `None` almost immediately (aka non-blocking), - but the results must be retrieved by hand when jobs are finished. No mechanisim is provided here to be aware + but the results must be retrieved by hand when jobs are finished. No mechanisim is provided here to be know when jobs are finish. - In this *asynchronous* case, the :py:func:read_sorter_folder() helps to retrieve individual results. + In this *asynchronous* case, the :py:func:`~spikeinterface.sorters.read_sorter_folder()` helps to retrieve individual results. Parameters @@ -82,7 +82,7 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal engine_kwargs = engine_kwargs_ if return_output: - assert engine in ("loop", "joblib", "processpoolexecutor") + assert engine in ("loop", "joblib", "processpoolexecutor"), "Only 'loop', 'joblib', and 'processpoolexecutor' support return_output=True." out = [] else: out = None @@ -355,7 +355,7 @@ def run_sorters( """ warnings.warn( - "run_sorters()is deprecated please use run_sorter_jobs() instead. This will be removed in 0.100", + "run_sorters() is deprecated please use run_sorter_jobs() instead. This will be removed in 0.100", DeprecationWarning, stacklevel=2, ) From 0ecf83b46dacf5426b7f55157f0d48497eb52245 Mon Sep 17 00:00:00 2001 From: Samuel Garcia Date: Tue, 19 Sep 2023 18:02:51 +0200 Subject: [PATCH 11/12] add read_sorter_folder in api.rst --- doc/api.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/api.rst b/doc/api.rst index b605127426..8b269fc685 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -219,6 +219,7 @@ spikeinterface.sorters .. autofunction:: run_sorter_jobs .. autofunction:: run_sorters .. autofunction:: run_sorter_by_property + .. autofunction:: read_sorter_folder Low level ~~~~~~~~~ From 60e8989d3207f9ad213d96484c767a53b7e535a2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 19 Sep 2023 16:05:28 +0000 Subject: [PATCH 12/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/spikeinterface/sorters/launcher.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/spikeinterface/sorters/launcher.py b/src/spikeinterface/sorters/launcher.py index d6506cade5..f32a468a22 100644 --- a/src/spikeinterface/sorters/launcher.py +++ b/src/spikeinterface/sorters/launcher.py @@ -82,7 +82,11 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal engine_kwargs = engine_kwargs_ if return_output: - assert engine in ("loop", "joblib", "processpoolexecutor"), "Only 'loop', 'joblib', and 'processpoolexecutor' support return_output=True." + assert engine in ( + "loop", + "joblib", + "processpoolexecutor", + ), "Only 'loop', 'joblib', and 'processpoolexecutor' support return_output=True." out = [] else: out = None