From 04b2e6b6472dee8b3792cd604e80618c3f5528b3 Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Sun, 30 Jul 2023 19:14:53 +0000 Subject: [PATCH] updates and fixes --- .../brainwide_map/brainwide_map_metadata.yml | 4 +- ...t_brainwide_map_processed_only_parallel.py | 160 ++++++++++++++++++ .../brainwidemaptrialsinterface.py | 3 +- .../iblposeestimationinterface.py | 7 +- .../datainterfaces/iblsortingextractor.py | 5 - .../datainterfaces/iblstreaminginterface.py | 4 +- .../datainterfaces/lickinterface.py | 2 +- .../datainterfaces/pupiltrackinginterface.py | 2 +- .../roimotionenergyinterface.py | 2 +- .../datainterfaces/wheelinterface.py | 6 +- ibl_to_nwb/updated_conversion/iblconverter.py | 11 +- 11 files changed, 180 insertions(+), 26 deletions(-) create mode 100644 ibl_to_nwb/updated_conversion/brainwide_map/convert_brainwide_map_processed_only_parallel.py diff --git a/ibl_to_nwb/updated_conversion/brainwide_map/brainwide_map_metadata.yml b/ibl_to_nwb/updated_conversion/brainwide_map/brainwide_map_metadata.yml index e04a4b1..1c6dc83 100644 --- a/ibl_to_nwb/updated_conversion/brainwide_map/brainwide_map_metadata.yml +++ b/ibl_to_nwb/updated_conversion/brainwide_map/brainwide_map_metadata.yml @@ -3,7 +3,9 @@ NWBFile: IBL aims to understand the neural basis of decision-making in the mouse by gathering a whole-brain activity map composed of electrophysiological recordings pooled from multiple laboratories. We have systematically recorded from nearly all major brain areas with Neuropixels probes, using a grid system for unbiased sampling and replicating each recording site in at least two laboratories. These data have been used to construct a brain-wide map of activity at single-spike cellular resolution during a decision-making task. In addition to the map, this data set contains other information gathered during the task: sensory stimuli presented to the mouse; mouse decisions and response times; and mouse pose information from video recordings and DeepLabCut analysis. session_description: | The full description of the session/task protocol can be found in Appendix 2 of International Brain Laboratory, et al. "Standardized and reproducible measurement of decision-making in mice." Elife 10 (2021); e63711. - related_publications: "https://doi.org/10.6084/m9.figshare.21400815.v6, https://doi.org/10.1101/2020.01.17.909838" # Brain Wide Map white paper and behavior task protocol information + related_publications: + - "https://doi.org/10.6084/m9.figshare.21400815.v6" + - "https://doi.org/10.1101/2020.01.17.909838" # Brain Wide Map white paper and behavior task protocol information Subject: description: | Mice were housed under a 12/12 h light/dark cycle (normal or inverted depending on the laboratory) with food and water 112 available ad libitum, except during behavioural training days. Electrophysiological recordings and behavioural training were performed during either the dark or light phase of the subject cycle depending on the laboratory. Subjects were obtained from either the Jackson Laboratory or Charles River. diff --git a/ibl_to_nwb/updated_conversion/brainwide_map/convert_brainwide_map_processed_only_parallel.py b/ibl_to_nwb/updated_conversion/brainwide_map/convert_brainwide_map_processed_only_parallel.py new file mode 100644 index 0000000..daf743d --- /dev/null +++ b/ibl_to_nwb/updated_conversion/brainwide_map/convert_brainwide_map_processed_only_parallel.py @@ -0,0 +1,160 @@ +import os + +os.environ["JUPYTER_PLATFORM_DIRS"] = "1" # Annoying + +import os +import traceback +from concurrent.futures import ProcessPoolExecutor, as_completed +from pathlib import Path +from shutil import rmtree +from tempfile import mkdtemp + +from dandi.download import download as dandi_download +from dandi.organize import organize as dandi_organize +from dandi.upload import upload as dandi_upload +from neuroconv.tools.data_transfers import automatic_dandi_upload +from one.api import ONE +from pynwb import NWBHDF5IO +from pynwb.image import ImageSeries +from tqdm import tqdm +from nwbinspector.tools import get_s3_urls_and_dandi_paths + +from ibl_to_nwb.updated_conversion.brainwide_map import BrainwideMapConverter +from ibl_to_nwb.updated_conversion.brainwide_map.datainterfaces import ( + BrainwideMapTrialsInterface, +) +from ibl_to_nwb.updated_conversion.datainterfaces import ( + IblPoseEstimationInterface, + IblSortingInterface, + IblStreamingApInterface, + IblStreamingLfInterface, + LickInterface, + PupilTrackingInterface, + RoiMotionEnergyInterface, + WheelInterface, +) + + +def convert_and_upload_parallel_processed_only( + base_path: Path, + session: str, + nwbfile_path: str, + stub_test: bool = False, + progress_position: int = 0, + cleanup: bool = False, + files_mode: str = "move", +): + try: + assert len(os.environ.get("DANDI_API_KEY", "")) > 0, "Run `export DANDI_API_KEY=...`!" + + nwbfile_path.parent.mkdir(exist_ok=True) + + # Download behavior and spike sorted data for this session + session_path = base_path / "ibl_conversion" / session + cache_folder = base_path / "ibl_conversion" / session / "cache" + session_one = ONE( + base_url="https://openalyx.internationalbrainlab.org", + password="international", + silent=True, + cache_dir=cache_folder, + ) + + # Initialize as many of each interface as we need across the streams + data_interfaces = list() + + # These interfaces should always be present in source data + data_interfaces.append(IblSortingInterface(session=session, cache_folder=cache_folder / "sorting")) + data_interfaces.append(BrainwideMapTrialsInterface(one=session_one, session=session)) + data_interfaces.append(WheelInterface(one=session_one, session=session)) + + # These interfaces may not be present; check if they are before adding to list + pose_estimation_files = session_one.list_datasets(eid=session, filename="*.dlc*") + for pose_estimation_file in pose_estimation_files: + camera_name = pose_estimation_file.replace("alf/_ibl_", "").replace(".dlc.pqt", "") + data_interfaces.append( + IblPoseEstimationInterface(one=session_one, session=session, camera_name=camera_name, include_video=False) + ) + + pupil_tracking_files = session_one.list_datasets(eid=session, filename="*features*") + for pupil_tracking_file in pupil_tracking_files: + camera_name = pupil_tracking_file.replace("alf/_ibl_", "").replace(".features.pqt", "") + data_interfaces.append(PupilTrackingInterface(one=session_one, session=session, camera_name=camera_name)) + + roi_motion_energy_files = session_one.list_datasets(eid=session, filename="*ROIMotionEnergy.npy*") + for roi_motion_energy_file in roi_motion_energy_files: + camera_name = roi_motion_energy_file.replace("alf/", "").replace(".ROIMotionEnergy.npy", "") + data_interfaces.append(RoiMotionEnergyInterface(one=session_one, session=session, camera_name=camera_name)) + + if session_one.list_datasets(eid=session, collection="alf", filename="licks*"): + data_interfaces.append(LickInterface(one=session_one, session=session)) + + # Run conversion + session_converter = BrainwideMapConverter( + one=session_one, session=session, data_interfaces=data_interfaces, verbose=False + ) + + metadata = session_converter.get_metadata() + metadata["NWBFile"]["session_id"] = metadata["NWBFile"]["session_id"] + "-processed-only" + + session_converter.run_conversion( + nwbfile_path=nwbfile_path, + metadata=metadata, + overwrite=True, + ) + automatic_dandi_upload( + dandiset_id="000409", nwb_folder_path=nwbfile_path.parent, cleanup=cleanup, #files_mode=files_mode + ) + if cleanup: + rmtree(cache_folder) + rmtree(nwbfile_path.parent) + + return 1 + except Exception as exception: + error_file_path = base_path / "errors" / "7-30-23" / f"{session}_error.txt" + error_file_path.parent.mkdir(exist_ok=True) + with open(file=error_file_path, mode="w") as file: + file.write(f"{type(exception)}: {str(exception)}\n{traceback.format_exc()}") + return 0 + + +number_of_parallel_jobs = 8 +base_path = Path("/home/jovyan/IBL") # prototype on DANDI Hub for now + +session_retrieval_one = ONE( + base_url="https://openalyx.internationalbrainlab.org", password="international", silent=True +) +brain_wide_sessions = [ + session_info["id"] + for session_info in session_retrieval_one.alyx.rest(url="sessions", action="list", tag="2022_Q4_IBL_et_al_BWM") +] + +# Already written sessions +dandi_file_paths = list(get_s3_urls_and_dandi_paths(dandiset_id="000409").values()) +dandi_processed_file_paths = [dandi_file_path for dandi_file_path in dandi_file_paths if "processed" in dandi_file_path] +already_written_processed_sessions = [ + processed_dandi_file_path.split("ses-")[1].split("_")[0].strip("-processed-only") + for processed_dandi_file_path in dandi_processed_file_paths +] +sessions_to_run = list(set(brain_wide_sessions) - set(already_written_processed_sessions)) + +with ProcessPoolExecutor(max_workers=number_of_parallel_jobs) as executor: + with tqdm(total=len(sessions_to_run), position=0, desc="Converting sessions...") as main_progress_bar: + futures = [] + for progress_position, session in enumerate(sessions_to_run): + nwbfile_path = base_path / "nwbfiles" / session / f"{session}.nwb" + nwbfile_path.parent.mkdir(exist_ok=True) + futures.append( + executor.submit( + convert_and_upload_parallel_processed_only, + base_path=base_path, + session=session, + nwbfile_path=nwbfile_path, + progress_position=1 + progress_position, + # stub_test=True, + # files_mode="copy", # useful when debugging + #cleanup=True, # causing shutil error ATM + ) + ) + for future in as_completed(futures): + status = future.result() + main_progress_bar.update(1) diff --git a/ibl_to_nwb/updated_conversion/brainwide_map/datainterfaces/brainwidemaptrialsinterface.py b/ibl_to_nwb/updated_conversion/brainwide_map/datainterfaces/brainwidemaptrialsinterface.py index c60f793..3a30043 100644 --- a/ibl_to_nwb/updated_conversion/brainwide_map/datainterfaces/brainwidemaptrialsinterface.py +++ b/ibl_to_nwb/updated_conversion/brainwide_map/datainterfaces/brainwidemaptrialsinterface.py @@ -28,7 +28,7 @@ def get_timestamps(self): def align_timestamps(self): pass - def run_conversion(self, nwbfile: NWBFile, metadata: dict): + def add_to_nwbfile(self, nwbfile: NWBFile, metadata: dict): trials = self.one.load_object(id=self.session, obj="trials", collection="alf") column_ordering = [ @@ -73,6 +73,7 @@ def run_conversion(self, nwbfile: NWBFile, metadata: dict): ) ) + # compression only works using the method above; method below fails # for start_time, stop_time in trials["intervals"]: # nwbfile.add_trial(start_time=start_time, stop_time=stop_time) diff --git a/ibl_to_nwb/updated_conversion/datainterfaces/iblposeestimationinterface.py b/ibl_to_nwb/updated_conversion/datainterfaces/iblposeestimationinterface.py index 165aa50..a9e6aca 100644 --- a/ibl_to_nwb/updated_conversion/datainterfaces/iblposeestimationinterface.py +++ b/ibl_to_nwb/updated_conversion/datainterfaces/iblposeestimationinterface.py @@ -10,10 +10,11 @@ class IblPoseEstimationInterface(BaseDataInterface): - def __init__(self, one: ONE, session: str, camera_name: str): + def __init__(self, one: ONE, session: str, camera_name: str, include_video: bool): self.one = one self.session = session self.camera_name = camera_name + self.include_video = include_video def get_original_timestamps(self): pass @@ -24,7 +25,7 @@ def get_timestamps(self): def align_timestamps(self): pass - def run_conversion(self, nwbfile, metadata: dict): + def add_to_nwbfile(self, nwbfile, metadata: dict): # Sometimes the DLC data has been revised, possibly multiple times # Always use the most recent revision available session_files = self.one.list_datasets(eid=self.session, filename=f"*{self.camera_name}.dlc*") @@ -76,7 +77,7 @@ def run_conversion(self, nwbfile, metadata: dict): source_software="DeepLabCut", nodes=body_parts, ) - if self.one.list_datasets(eid=self.session, filename=f"raw_video_data/*{self.camera_name}*"): + if self.include_video and self.one.list_datasets(eid=self.session, filename=f"raw_video_data/*{self.camera_name}*"): original_video_file = self.one.load_dataset( id=self.session, dataset=f"raw_video_data/*{self.camera_name}*", download_only=True ) diff --git a/ibl_to_nwb/updated_conversion/datainterfaces/iblsortingextractor.py b/ibl_to_nwb/updated_conversion/datainterfaces/iblsortingextractor.py index fb28f16..e6196f6 100644 --- a/ibl_to_nwb/updated_conversion/datainterfaces/iblsortingextractor.py +++ b/ibl_to_nwb/updated_conversion/datainterfaces/iblsortingextractor.py @@ -47,11 +47,6 @@ def __init__(self, session: str, cache_folder: Optional[DirectoryPath] = None): cluster_ids.extend(list(np.array(clusters["metrics"]["cluster_id"]) + unit_id_per_probe_shift)) number_of_units = len(np.unique(spikes["clusters"])) - print(f"{spikes['clusters']}=") - print(f"{len(set(spikes['clusters']))}=") - print(f"{clusters['metrics']['cluster_id']}=") - print(f"{clusters['metrics']}=") - # TODO - compare speed against iterating over unique cluster IDs + vector index search for spike_cluster, spike_times, spike_amplitudes, spike_depths in zip( spikes["clusters"], spikes["times"], spikes["amps"], spikes["depths"] diff --git a/ibl_to_nwb/updated_conversion/datainterfaces/iblstreaminginterface.py b/ibl_to_nwb/updated_conversion/datainterfaces/iblstreaminginterface.py index 03a7d08..add74cd 100644 --- a/ibl_to_nwb/updated_conversion/datainterfaces/iblstreaminginterface.py +++ b/ibl_to_nwb/updated_conversion/datainterfaces/iblstreaminginterface.py @@ -137,7 +137,7 @@ def get_metadata(self) -> dict: return metadata - def run_conversion(self, **kwargs): + def add_to_nwbfile(self, **kwargs): # The buffer and chunk shapes must be set explicitly for good performance with the streaming # Otherwise, the default buffer/chunk shapes might re-request the same data packet multiple times chunk_frames = 100 if kwargs.get("stub_test", False) else 30_000 @@ -156,7 +156,7 @@ def run_conversion(self, **kwargs): if "progress_position" in kwargs: kwargs.pop("progress_position") kwargs.update(es_key=self.es_key) - super().run_conversion(**kwargs) + super().add_to_nwbfile(**kwargs) class IblStreamingLfInterface(IblStreamingApInterface): diff --git a/ibl_to_nwb/updated_conversion/datainterfaces/lickinterface.py b/ibl_to_nwb/updated_conversion/datainterfaces/lickinterface.py index 9271e0d..4d0d85f 100644 --- a/ibl_to_nwb/updated_conversion/datainterfaces/lickinterface.py +++ b/ibl_to_nwb/updated_conversion/datainterfaces/lickinterface.py @@ -20,7 +20,7 @@ def get_timestamps(self): def align_timestamps(self): pass - def run_conversion(self, nwbfile, metadata: dict): + def add_to_nwbfile(self, nwbfile, metadata: dict): licks = self.one.load_object(id=self.session, obj="licks", collection="alf") lick_events_table = DynamicTable( diff --git a/ibl_to_nwb/updated_conversion/datainterfaces/pupiltrackinginterface.py b/ibl_to_nwb/updated_conversion/datainterfaces/pupiltrackinginterface.py index b4c53aa..a96f451 100644 --- a/ibl_to_nwb/updated_conversion/datainterfaces/pupiltrackinginterface.py +++ b/ibl_to_nwb/updated_conversion/datainterfaces/pupiltrackinginterface.py @@ -33,7 +33,7 @@ def get_timestamps(self): def align_timestamps(self): pass - def run_conversion(self, nwbfile, metadata: dict): + def add_to_nwbfile(self, nwbfile, metadata: dict): left_or_right = self.camera_name[:5].rstrip("C") camera_data = self.one.load_object(id=self.session, obj=self.camera_name, collection="alf") diff --git a/ibl_to_nwb/updated_conversion/datainterfaces/roimotionenergyinterface.py b/ibl_to_nwb/updated_conversion/datainterfaces/roimotionenergyinterface.py index 26ebe8f..489bf17 100644 --- a/ibl_to_nwb/updated_conversion/datainterfaces/roimotionenergyinterface.py +++ b/ibl_to_nwb/updated_conversion/datainterfaces/roimotionenergyinterface.py @@ -21,7 +21,7 @@ def get_timestamps(self): def align_timestamps(self): pass - def run_conversion(self, nwbfile, metadata: dict): + def add_to_nwbfile(self, nwbfile, metadata: dict): left_right_or_body = self.camera_name[:5].rstrip("C") camera_data = self.one.load_object(id=self.session, obj=self.camera_name, collection="alf") diff --git a/ibl_to_nwb/updated_conversion/datainterfaces/wheelinterface.py b/ibl_to_nwb/updated_conversion/datainterfaces/wheelinterface.py index 9135899..4413e79 100644 --- a/ibl_to_nwb/updated_conversion/datainterfaces/wheelinterface.py +++ b/ibl_to_nwb/updated_conversion/datainterfaces/wheelinterface.py @@ -31,7 +31,7 @@ def get_timestamps(self): def align_timestamps(self): pass - def run_conversion(self, nwbfile, metadata: dict): + def add_to_nwbfile(self, nwbfile, metadata: dict): wheel_moves = self.one.load_object(id=self.session, obj="wheelMoves", collection="alf") wheel = self.one.load_object(id=self.session, obj="wheel", collection="alf") @@ -40,9 +40,7 @@ def run_conversion(self, nwbfile, metadata: dict): interpolated_position, interpolated_timestamps = wheel_methods.interpolate_position( re_ts=wheel["timestamps"], re_pos=wheel["position"], freq=interpolation_frequency ) - velocity, acceleration = wheel_methods.velocity_filtered( - pos=interpolated_position, freq=interpolation_frequency - ) + velocity, acceleration = wheel_methods.velocity_filtered(pos=interpolated_position, fs=interpolation_frequency) # Deterministically regular interpolated_starting_time = interpolated_timestamps[0] diff --git a/ibl_to_nwb/updated_conversion/iblconverter.py b/ibl_to_nwb/updated_conversion/iblconverter.py index cd2f06c..9873b91 100644 --- a/ibl_to_nwb/updated_conversion/iblconverter.py +++ b/ibl_to_nwb/updated_conversion/iblconverter.py @@ -120,11 +120,8 @@ def run_conversion( metadata = self.get_metadata() self.validate_metadata(metadata=metadata) - if conversion_options is None: - conversion_options = dict() - default_conversion_options = self.get_conversion_options() - conversion_options_to_run = dict_deep_update(default_conversion_options, conversion_options) - self.validate_conversion_options(conversion_options=conversion_options_to_run) + conversion_options = conversion_options or dict() + self.validate_conversion_options(conversion_options=conversion_options) with make_or_load_nwbfile( nwbfile_path=nwbfile_path, @@ -135,8 +132,8 @@ def run_conversion( ) as nwbfile_out: nwbfile_out.subject = ibl_subject for interface_name, data_interface in self.data_interface_objects.items(): - data_interface.run_conversion( - nwbfile=nwbfile_out, metadata=metadata, **conversion_options_to_run.get(interface_name, dict()) + data_interface.add_to_nwbfile( + nwbfile=nwbfile_out, metadata=metadata, **conversion_options.get(interface_name, dict()) ) return nwbfile_out