From 9a59e574dd25176a8da9b8142d6e87aeed3c5f74 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 7 Jun 2022 13:26:32 -0500 Subject: [PATCH 1/6] improve logic for running kilosort modules in a resumable fashion --- .../readers/kilosort_triggering.py | 33 +++++++++++++++++-- element_array_ephys/readers/openephys.py | 2 +- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/element_array_ephys/readers/kilosort_triggering.py b/element_array_ephys/readers/kilosort_triggering.py index efe73949..1c746373 100644 --- a/element_array_ephys/readers/kilosort_triggering.py +++ b/element_array_ephys/readers/kilosort_triggering.py @@ -7,7 +7,7 @@ import os import scipy.io import numpy as np -from datetime import datetime +from datetime import datetime, timedelta from element_interface.utils import dict_to_uuid @@ -191,8 +191,7 @@ def run_modules(self): if module_status['completion_time'] is not None: continue - module_output_json = module_input_json.replace('-input.json', - '-' + module + '-output.json') + module_output_json = self._get_module_output_json_filename(module) command = (sys.executable + " -W ignore -m ecephys_spike_sorting.modules." + module + " --input_json " + module_input_json @@ -233,11 +232,19 @@ def _update_module_status(self, updated_module_status={}): with open(self._modules_input_hash_fp) as f: modules_status = json.load(f) modules_status = {**modules_status, **updated_module_status} + modules_status['cumulative_execution_duration'] = sum( + v['duration'] or 0 for k, v in modules_status.items() + if k not in ('cumulative_execution_duration', 'total_duration')) + modules_status['total_duration'] = ( + modules_status[self._modules[-1]]['completion_time'] + - modules_status[self._modules[0]]['start_time']).total_seconds() else: modules_status = {module: {'start_time': None, 'completion_time': None, 'duration': None} for module in self._modules} + modules_status['cumulative_execution_duration'] = 0 + modules_status['total_duration'] = 0 with open(self._modules_input_hash_fp, 'w') as f: json.dump(modules_status, f, default=str) @@ -248,10 +255,30 @@ def _get_module_status(self, module): if self._modules_input_hash_fp.exists(): with open(self._modules_input_hash_fp) as f: modules_status = json.load(f) + if modules_status[module]['completion_time'] is None: + # additional logic to read from the "-output.json" file for this module as well + # handle cases where the module has finished successfully, + # but the "_modules_input_hash_fp" is not updated (for whatever reason), + # resulting in this module not registered as completed in the "_modules_input_hash_fp" + modules_module_output_json_fp = pathlib.Path(self._get_module_output_json_filename(module)) + if modules_module_output_json_fp.exists(): + with open(modules_module_output_json_fp) as f: + module_run_output = json.load(f) + modules_status[module]['duration'] = module_run_output['execution_time'] + modules_status[module]['completion_time'] = ( + modules_status[module]['start_time'] + + timedelta(seconds=module_run_output['execution_time'])) return modules_status[module] return {'start_time': None, 'completion_time': None, 'duration': None} + def _get_module_output_json_filename(self, module): + module_input_json = self._module_input_json.as_posix() + module_output_json = module_input_json.replace( + '-input.json', + '-' + module + '-' + self._modules_input_hash + '-output.json') + return module_output_json + class OpenEphysKilosortPipeline: """ diff --git a/element_array_ephys/readers/openephys.py b/element_array_ephys/readers/openephys.py index 1a37a4ba..0d39dd55 100644 --- a/element_array_ephys/readers/openephys.py +++ b/element_array_ephys/readers/openephys.py @@ -145,7 +145,7 @@ def load_probe_data(self): else: continue # not continuous data for the current probe else: - raise ValueError(f'Unable to infer type (AP or LFP) for the continuous data from:\n\t{continuous_info}') + raise ValueError(f'Unable to infer type (AP or LFP) for the continuous data from:\n\t{continuous_info["folder_name"]}') if continuous_type == 'ap': probe.recording_info['recording_count'] += 1 From b97566e6b833d610377e93cd21a08a0272f3a075 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 7 Jun 2022 15:56:40 -0500 Subject: [PATCH 2/6] code cleanup, minor bugfix --- .../readers/kilosort_triggering.py | 78 +++++++++++++++---- 1 file changed, 65 insertions(+), 13 deletions(-) diff --git a/element_array_ephys/readers/kilosort_triggering.py b/element_array_ephys/readers/kilosort_triggering.py index 1c746373..9f714f6d 100644 --- a/element_array_ephys/readers/kilosort_triggering.py +++ b/element_array_ephys/readers/kilosort_triggering.py @@ -198,6 +198,10 @@ def run_modules(self): + " --output_json " + module_output_json) start_time = datetime.utcnow() + self._update_module_status( + {module: {'start_time': start_time, + 'completion_time': None, + 'duration': None}}) with open(module_logfile, "a") as f: subprocess.check_call(command.split(' '), stdout=f) completion_time = datetime.utcnow() @@ -206,6 +210,8 @@ def run_modules(self): 'completion_time': completion_time, 'duration': (completion_time - start_time).total_seconds()}}) + self._update_total_duration() + def _get_raw_data_filepaths(self): session_str, gate_str, _, probe_str = self.parse_input_filename() @@ -232,19 +238,11 @@ def _update_module_status(self, updated_module_status={}): with open(self._modules_input_hash_fp) as f: modules_status = json.load(f) modules_status = {**modules_status, **updated_module_status} - modules_status['cumulative_execution_duration'] = sum( - v['duration'] or 0 for k, v in modules_status.items() - if k not in ('cumulative_execution_duration', 'total_duration')) - modules_status['total_duration'] = ( - modules_status[self._modules[-1]]['completion_time'] - - modules_status[self._modules[0]]['start_time']).total_seconds() else: modules_status = {module: {'start_time': None, 'completion_time': None, 'duration': None} for module in self._modules} - modules_status['cumulative_execution_duration'] = 0 - modules_status['total_duration'] = 0 with open(self._modules_input_hash_fp, 'w') as f: json.dump(modules_status, f, default=str) @@ -260,13 +258,13 @@ def _get_module_status(self, module): # handle cases where the module has finished successfully, # but the "_modules_input_hash_fp" is not updated (for whatever reason), # resulting in this module not registered as completed in the "_modules_input_hash_fp" - modules_module_output_json_fp = pathlib.Path(self._get_module_output_json_filename(module)) - if modules_module_output_json_fp.exists(): - with open(modules_module_output_json_fp) as f: + module_output_json_fp = pathlib.Path(self._get_module_output_json_filename(module)) + if module_output_json_fp.exists(): + with open(module_output_json_fp) as f: module_run_output = json.load(f) modules_status[module]['duration'] = module_run_output['execution_time'] modules_status[module]['completion_time'] = ( - modules_status[module]['start_time'] + datetime.strptime(modules_status[module]['start_time'], '%Y-%m-%d %H:%M:%S.%f') + timedelta(seconds=module_run_output['execution_time'])) return modules_status[module] @@ -276,9 +274,23 @@ def _get_module_output_json_filename(self, module): module_input_json = self._module_input_json.as_posix() module_output_json = module_input_json.replace( '-input.json', - '-' + module + '-' + self._modules_input_hash + '-output.json') + '-' + module + '-' + str(self._modules_input_hash) + '-output.json') return module_output_json + def _update_total_duration(self): + with open(self._modules_input_hash_fp) as f: + modules_status = json.load(f) + cumulative_execution_duration = sum( + v['duration'] or 0 for k, v in modules_status.items() + if k not in ('cumulative_execution_duration', 'total_duration')) + total_duration = ( + datetime.strptime(modules_status[self._modules[-1]]['completion_time'], '%Y-%m-%d %H:%M:%S.%f') + - datetime.strptime(modules_status[self._modules[0]]['start_time'], '%Y-%m-%d %H:%M:%S.%f') + ).total_seconds() + self._update_module_status( + {'cumulative_execution_duration': cumulative_execution_duration, + 'total_duration': total_duration}) + class OpenEphysKilosortPipeline: """ @@ -388,6 +400,10 @@ def run_modules(self): + " --output_json " + module_output_json) start_time = datetime.utcnow() + self._update_module_status( + {module: {'start_time': start_time, + 'completion_time': None, + 'duration': None}}) with open(module_logfile, "a") as f: subprocess.check_call(command.split(' '), stdout=f) completion_time = datetime.utcnow() @@ -396,6 +412,8 @@ def run_modules(self): 'completion_time': completion_time, 'duration': (completion_time - start_time).total_seconds()}}) + self._update_total_duration() + def _update_module_status(self, updated_module_status={}): if self._modules_input_hash is None: raise RuntimeError('"generate_modules_input_json()" not yet performed!') @@ -420,10 +438,44 @@ def _get_module_status(self, module): if self._modules_input_hash_fp.exists(): with open(self._modules_input_hash_fp) as f: modules_status = json.load(f) + if modules_status[module]['completion_time'] is None: + # additional logic to read from the "-output.json" file for this module as well + # handle cases where the module has finished successfully, + # but the "_modules_input_hash_fp" is not updated (for whatever reason), + # resulting in this module not registered as completed in the "_modules_input_hash_fp" + module_output_json_fp = pathlib.Path(self._get_module_output_json_filename(module)) + if module_output_json_fp.exists(): + with open(module_output_json_fp) as f: + module_run_output = json.load(f) + modules_status[module]['duration'] = module_run_output['execution_time'] + modules_status[module]['completion_time'] = ( + datetime.strptime(modules_status[module]['start_time'], '%Y-%m-%d %H:%M:%S.%f') + + timedelta(seconds=module_run_output['execution_time'])) return modules_status[module] return {'start_time': None, 'completion_time': None, 'duration': None} + def _get_module_output_json_filename(self, module): + module_input_json = self._module_input_json.as_posix() + module_output_json = module_input_json.replace( + '-input.json', + '-' + module + '-' + str(self._modules_input_hash) + '-output.json') + return module_output_json + + def _update_total_duration(self): + with open(self._modules_input_hash_fp) as f: + modules_status = json.load(f) + cumulative_execution_duration = sum( + v['duration'] or 0 for k, v in modules_status.items() + if k not in ('cumulative_execution_duration', 'total_duration')) + total_duration = ( + datetime.strptime(modules_status[self._modules[-1]]['completion_time'], '%Y-%m-%d %H:%M:%S.%f') + - datetime.strptime(modules_status[self._modules[0]]['start_time'], '%Y-%m-%d %H:%M:%S.%f') + ).total_seconds() + self._update_module_status( + {'cumulative_execution_duration': cumulative_execution_duration, + 'total_duration': total_duration}) + def run_pykilosort(continuous_file, kilosort_output_directory, params, channel_ind, x_coords, y_coords, shank_ind, connected, sample_rate): From 3f1ee371bec5c68a2c9838082df87b6368074ebd Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Wed, 8 Jun 2022 23:54:51 -0500 Subject: [PATCH 3/6] bugfix, match new implementation for openephys --- element_array_ephys/readers/kilosort_triggering.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/element_array_ephys/readers/kilosort_triggering.py b/element_array_ephys/readers/kilosort_triggering.py index 9f714f6d..c4c86849 100644 --- a/element_array_ephys/readers/kilosort_triggering.py +++ b/element_array_ephys/readers/kilosort_triggering.py @@ -392,8 +392,7 @@ def run_modules(self): if module_status['completion_time'] is not None: continue - module_output_json = module_input_json.replace('-input.json', - '-' + module + '-output.json') + module_output_json = self._get_module_output_json_filename(module) command = (sys.executable + " -W ignore -m ecephys_spike_sorting.modules." + module + " --input_json " + module_input_json From 0c77826af1141d0e2d5828736252b33e56734af5 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 30 Jun 2022 16:38:35 -0500 Subject: [PATCH 4/6] improve kilosort calls, handle spaces in paths --- element_array_ephys/readers/kilosort_triggering.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/element_array_ephys/readers/kilosort_triggering.py b/element_array_ephys/readers/kilosort_triggering.py index c4c86849..e737a783 100644 --- a/element_array_ephys/readers/kilosort_triggering.py +++ b/element_array_ephys/readers/kilosort_triggering.py @@ -392,11 +392,11 @@ def run_modules(self): if module_status['completion_time'] is not None: continue - module_output_json = self._get_module_output_json_filename(module) - command = (sys.executable - + " -W ignore -m ecephys_spike_sorting.modules." + module - + " --input_json " + module_input_json - + " --output_json " + module_output_json) + module_output_json = self._get_module_output_json_filename(module) + command = [sys.executable, + '-W', 'ignore', '-m', 'ecephys_spike_sorting.modules.' + module, + '--input_json', module_input_json.replace(' ', '\ '), + '--output_json', module_output_json.replace(' ', '\ ')] start_time = datetime.utcnow() self._update_module_status( @@ -404,7 +404,7 @@ def run_modules(self): 'completion_time': None, 'duration': None}}) with open(module_logfile, "a") as f: - subprocess.check_call(command.split(' '), stdout=f) + subprocess.check_call(command, stdout=f) completion_time = datetime.utcnow() self._update_module_status( {module: {'start_time': start_time, From b71b459744b212251d0685b7bebb82d859fc8723 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 30 Jun 2022 16:45:07 -0500 Subject: [PATCH 5/6] remove space escaping character --- element_array_ephys/readers/kilosort_triggering.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/element_array_ephys/readers/kilosort_triggering.py b/element_array_ephys/readers/kilosort_triggering.py index e737a783..ccb21312 100644 --- a/element_array_ephys/readers/kilosort_triggering.py +++ b/element_array_ephys/readers/kilosort_triggering.py @@ -395,8 +395,8 @@ def run_modules(self): module_output_json = self._get_module_output_json_filename(module) command = [sys.executable, '-W', 'ignore', '-m', 'ecephys_spike_sorting.modules.' + module, - '--input_json', module_input_json.replace(' ', '\ '), - '--output_json', module_output_json.replace(' ', '\ ')] + '--input_json', module_input_json, + '--output_json', module_output_json] start_time = datetime.utcnow() self._update_module_status( From a3c5c2fb9c03e3b6df293ed0e8fb58f17a20ef78 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Tue, 19 Jul 2022 12:43:03 -0500 Subject: [PATCH 6/6] improve error message --- element_array_ephys/ephys_acute.py | 3 +++ element_array_ephys/ephys_chronic.py | 3 +++ element_array_ephys/ephys_no_curation.py | 3 +++ 3 files changed, 9 insertions(+) diff --git a/element_array_ephys/ephys_acute.py b/element_array_ephys/ephys_acute.py index 5a0a79e3..320db517 100644 --- a/element_array_ephys/ephys_acute.py +++ b/element_array_ephys/ephys_acute.py @@ -295,6 +295,9 @@ def make(self, key): raise FileNotFoundError( 'No Open Ephys data found for probe insertion: {}'.format(key)) + if not probe_data.ap_meta: + raise IOError('No analog signals found - check "structure.oebin" file or "continuous" directory') + if probe_data.probe_model in supported_probe_types: probe_type = probe_data.probe_model electrode_query = probe.ProbeType.Electrode & {'probe_type': probe_type} diff --git a/element_array_ephys/ephys_chronic.py b/element_array_ephys/ephys_chronic.py index 9a9c3df9..d8162126 100644 --- a/element_array_ephys/ephys_chronic.py +++ b/element_array_ephys/ephys_chronic.py @@ -242,6 +242,9 @@ def make(self, key): raise FileNotFoundError( 'No Open Ephys data found for probe insertion: {}'.format(key)) + if not probe_data.ap_meta: + raise IOError('No analog signals found - check "structure.oebin" file or "continuous" directory') + if probe_data.probe_model in supported_probe_types: probe_type = probe_data.probe_model electrode_query = probe.ProbeType.Electrode & {'probe_type': probe_type} diff --git a/element_array_ephys/ephys_no_curation.py b/element_array_ephys/ephys_no_curation.py index 6adb2eef..bbd00fa1 100644 --- a/element_array_ephys/ephys_no_curation.py +++ b/element_array_ephys/ephys_no_curation.py @@ -293,6 +293,9 @@ def make(self, key): raise FileNotFoundError( 'No Open Ephys data found for probe insertion: {}'.format(key)) + if not probe_data.ap_meta: + raise IOError('No analog signals found - check "structure.oebin" file or "continuous" directory') + if probe_data.probe_model in supported_probe_types: probe_type = probe_data.probe_model electrode_query = probe.ProbeType.Electrode & {'probe_type': probe_type}