diff --git a/requirements.txt b/requirements.txt index acfef3f..d3eaa17 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -dandi==0.58.0 +dandi hdmf==3.6.1 colorcet cellpose @@ -8,6 +8,4 @@ ndx-photometry @ git+https://github.com/catalystneuro/ndx-photometry.git@7ea9d75 ndx-depth-moseq @ git+https://github.com/catalystneuro/ndx-depth-moseq.git@main pyarrow neuroconv -nwbwidgets -nwbinspector pre-commit diff --git a/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/convert_dataset.py b/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/convert_dataset.py index bf3ce46..fece989 100644 --- a/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/convert_dataset.py +++ b/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/convert_dataset.py @@ -1,11 +1,14 @@ +import traceback +import json from pathlib import Path from typing import Union +from concurrent.futures import ProcessPoolExecutor, as_completed + +import pandas as pd from neuroconv.utils import load_dict_from_file from tqdm import tqdm -import pandas as pd -import yaml -from datta_lab_to_nwb.markowitz_gillis_nature_2023.convert_session import session_to_nwb -import shutil + +from datta_lab_to_nwb.markowitz_gillis_nature_2023.convert_session import _safe_session_to_nwb folder_name_to_experiment_type = { "_aggregate_results_arhmm_03": "reinforcement", @@ -32,73 +35,131 @@ } -def dataset_to_nwb( +def get_all_processed_uuids( + *, processed_path: Union[str, Path], - raw_dir_path: Union[str, Path], output_dir_path: Union[str, Path], - skip_sessions: set, - num_sessions: int = None, -): +) -> set: processed_path = Path(processed_path) - raw_dir_path = Path(raw_dir_path) output_dir_path = Path(output_dir_path) - output_dir_path.mkdir(parents=True, exist_ok=True) + + uuid_file_path = output_dir_path.parent / "all_processed_uuids.txt" + + if uuid_file_path.exists(): + with open(file=uuid_file_path, mode="r") as io: + all_processed_uuids = set(json.load(fp=io)) + return all_processed_uuids photometry_uuids = pd.read_parquet( processed_path / "dlight_raw_data/dlight_photometry_processed_full.parquet", columns=["uuid"] ) - photometry_uuids = set(photometry_uuids["uuid"]) + unique_photometry_uuids = set(photometry_uuids["uuid"]) + del photometry_uuids + reinforcement_uuids = pd.read_parquet( processed_path / "optoda_raw_data/closed_loop_behavior.parquet", columns=["uuid"] ) - reinforcement_uuids = set(reinforcement_uuids["uuid"]) + unique_reinforcement_uuids = set(reinforcement_uuids["uuid"]) + del reinforcement_uuids + velocity_uuids = pd.read_parquet( processed_path / "optoda_raw_data/closed_loop_behavior_velocity_conditioned.parquet", columns=["uuid"] ) - velocity_uuids = set(velocity_uuids["uuid"]) - all_processed_uuids = photometry_uuids.union(reinforcement_uuids).union(velocity_uuids) + unique_velocity_uuids = set(velocity_uuids["uuid"]) + del velocity_uuids + + all_processed_uuids = unique_photometry_uuids | unique_reinforcement_uuids | unique_velocity_uuids + + with open(file=uuid_file_path, mode="w") as io: + json.dump(obj=list(all_processed_uuids), fp=io) + return all_processed_uuids + + +def dataset_to_nwb( + *, + processed_path: Union[str, Path], + raw_dir_path: Union[str, Path], + output_dir_path: Union[str, Path], + skip_sessions: set, + number_of_jobs: int, + num_sessions_per_experiment: int = None, +): + processed_path = Path(processed_path) + raw_dir_path = Path(raw_dir_path) + output_dir_path = Path(output_dir_path) + + log_folder_path = output_dir_path.parent / "logs" + log_folder_path.mkdir(exist_ok=True) + + missing_folder_path = output_dir_path.parent / "missing" + missing_folder_path.mkdir(exist_ok=True) + + all_processed_uuids = get_all_processed_uuids(processed_path=processed_path, output_dir_path=output_dir_path) + experimental_folders = [ - folder for folder in raw_dir_path.iterdir() if folder.is_dir() and folder.name not in skip_experiments + folder + for folder in raw_dir_path.iterdir() + if folder.is_dir() and folder.name not in skip_experiments and folder.name.startswith("_") ] - for experimental_folder in tqdm(experimental_folders): + for experimental_folder in tqdm(iterable=experimental_folders, position=0, description="Converting experiments..."): experiment_type = folder_name_to_experiment_type[experimental_folder.name] session_folders = [ folder for folder in experimental_folder.iterdir() if folder.is_dir() and folder.name not in skip_sessions ] - if num_sessions is None: - num_sessions = len(session_folders) + 1 + if num_sessions_per_experiment is None: + num_sessions_per_experiment = len(session_folders) + 1 session_num = 0 - for session_folder in session_folders: - print(f"Processing {session_folder.name}") - results_file = session_folder / "proc" / "results_00.yaml" - results = load_dict_from_file(results_file) - session_uuid = results["uuid"] - if session_uuid not in all_processed_uuids: - continue - session_to_nwb( - session_uuid=session_uuid, - experiment_type=experiment_type, - processed_path=processed_path, - raw_path=session_folder, - output_dir_path=output_dir_path, + + futures = list() + with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: + for session_folder in session_folders: + error_identifier_base = f"{experimental_folder.name}_{session_folder.name}" + + results_file = session_folder / "proc" / "results_00.yaml" + + if not results_file.exists(): + (missing_folder_path / f"{error_identifier_base}.txt").touch() + continue + + results = load_dict_from_file(results_file) + session_uuid = results["uuid"] + if session_uuid not in all_processed_uuids: + continue + + futures.append( + executor.submit( + _safe_session_to_nwb, + session_uuid=session_uuid, + experiment_type=experiment_type, + processed_path=processed_path, + raw_path=session_folder, + output_dir_path=output_dir_path, + log_file_path=log_folder_path / f"{error_identifier_base}_{session_uuid}.txt", + ), + ) + + session_num += 1 + if session_num >= num_sessions_per_experiment: + break + + parallel_iterable = tqdm( + iterable=as_completed(futures), position=1, description="Converting sessionsin parallel..." ) - session_num += 1 - if session_num >= num_sessions: - break + for _ in parallel_iterable: + pass if __name__ == "__main__": - processed_path = Path("NWB/DattaConv/processed_data") - raw_dir_path = Path("NWB/DattaConv/raw_data") - output_dir_path = Path("NWB/DattaConv/conversion_output") - if output_dir_path.exists(): - shutil.rmtree( - output_dir_path, ignore_errors=True - ) # ignore errors due to MacOS race condition (https://github.com/python/cpython/issues/81441) + number_of_jobs = 4 + + processed_path = Path("E:/Datta/dopamine-reinforces-spontaneous-behavior") + raw_dir_path = Path("E:/Datta") + output_dir_path = Path("E:/datta_output/files") + skip_experiments = { "keypoint", # no proc folder for keypoints } - skip_sessions = { + temporary_skip_sessions = { "session_20210420113646-974717", # _aggregate_results_arhmm_photometry_excitation_pulsed_01: missing everything except depth video "session_20210309134748-687283", # _aggregate_results_arhmm_excitation_03: missing everything except depth video "session_20210224083612-947426", # _aggregate_results_arhmm_excitation_03: missing proc folder @@ -120,5 +181,14 @@ def dataset_to_nwb( "session_20210429135801-758690", # _aggregate_results_arhmm_09: missing everything except depth video "session_20191111130454-333065", # _aggregate_results_arhmm_05: missing proc folder "session_20191111130847-263894", # _aggregate_results_arhmm_05: missing proc folder + "session_20200720110309-817092", + "session_20210115130943-880998", } - dataset_to_nwb(processed_path, raw_dir_path, output_dir_path, skip_sessions, num_sessions=1) + dataset_to_nwb( + processed_path=processed_path, + raw_dir_path=raw_dir_path, + output_dir_path=output_dir_path, + skip_sessions=temporary_skip_sessions, + number_of_jobs=number_of_jobs, + num_sessions_per_experiment=1, + ) diff --git a/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/convert_session.py b/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/convert_session.py index 5ffc1ae..e7e177f 100644 --- a/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/convert_session.py +++ b/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/convert_session.py @@ -1,18 +1,42 @@ """Primary script to run to convert an entire session for of data using the NWBConverter.""" -# Standard Library -from pathlib import Path import shutil +import traceback +from pathlib import Path from typing import Union, Literal -# Third Party from neuroconv.utils import dict_deep_update, load_dict_from_file from pynwb import NWBHDF5IO -# Local from datta_lab_to_nwb.markowitz_gillis_nature_2023.postconversion import reproduce_fig1d from datta_lab_to_nwb.markowitz_gillis_nature_2023.nwbconverter import DattaNWBConverter +def _safe_session_to_nwb( + *, + session_uuid: str, + processed_path: Union[str, Path], + raw_path: Union[str, Path], + output_dir_path: Union[str, Path], + experiment_type: Literal["reinforcement", "photometry", "reinforcement-photometry", "velocity-modulation"], + log_file_path: Path, + processed_only: bool = False, + stub_test: bool = False, +): + try: + session_to_nwb( + session_uuid=session_uuid, + processed_path=processed_path, + raw_path=raw_path, + output_dir_path=output_dir_path, + experiment_type=experiment_type, + processed_only=processed_only, + stub_test=stub_test, + ) + except Exception as exception: + with open(file=log_file_path, mode="w") as io: + io.write(f"{type(exception)}: {str(exception)}\n\n{traceback.format_exc()}") + + def session_to_nwb( session_uuid: str, processed_path: Union[str, Path], @@ -139,7 +163,9 @@ def session_to_nwb( metadata = dict_deep_update(metadata, paper_metadata) # Run conversion - converter.run_conversion(metadata=metadata, nwbfile_path=nwbfile_path, conversion_options=conversion_options) + converter.run_conversion( + metadata=metadata, nwbfile_path=nwbfile_path, conversion_options=conversion_options, overwrite=True + ) if __name__ == "__main__": diff --git a/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/optogeneticinterface.py b/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/optogeneticinterface.py index a26efda..cc8c150 100644 --- a/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/optogeneticinterface.py +++ b/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/optogeneticinterface.py @@ -115,6 +115,12 @@ def add_to_nwbfile(self, nwbfile: NWBFile, metadata: dict, velocity_modulation: ], ) + # Reconstruct optogenetic series from feedback status + if pd.isnull(metadata["Optogenetics"]["stim_frequency_Hz"]): # cts stim + data, timestamps = self.reconstruct_cts_stim(metadata, session_df, session_timestamps) + else: # pulsed stim + data, timestamps = self.reconstruct_pulsed_stim(metadata, session_df, session_timestamps) + device = nwbfile.create_device( name="OptoEngineMRL", description="Optogenetic stimulator (Opto Engine MRL-III-635; SKU: RD-635-00500-CWM-SD-03-LED-0)", @@ -127,11 +133,6 @@ def add_to_nwbfile(self, nwbfile: NWBFile, metadata: dict, velocity_modulation: excitation_lambda=635.0, location=metadata["Optogenetics"]["area"], ) - # Reconstruct optogenetic series from feedback status - if pd.isnull(metadata["Optogenetics"]["stim_frequency_Hz"]): # cts stim - data, timestamps = self.reconstruct_cts_stim(metadata, session_df, session_timestamps) - else: # pulsed stim - data, timestamps = self.reconstruct_pulsed_stim(metadata, session_df, session_timestamps) id2sorted_index = metadata["BehavioralSyllable"]["id2sorted_index"] target_syllables = [id2sorted_index[syllable_id] for syllable_id in metadata["Optogenetics"]["target_syllable"]] ogen_series = OptogeneticSeries( diff --git a/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/rawfiberphotometryinterface.py b/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/rawfiberphotometryinterface.py index fd76cd8..1fbb2fb 100644 --- a/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/rawfiberphotometryinterface.py +++ b/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/rawfiberphotometryinterface.py @@ -132,7 +132,7 @@ def add_to_nwbfile(self, nwbfile: NWBFile, metadata: dict) -> None: ), data=H5DataIO(commanded_signal, compression=True), frequency=metadata["FiberPhotometry"]["signal_freq"], - power=metadata["FiberPhotometry"]["signal_amp"], # TODO: Fix this in ndx-photometry + power=float(metadata["FiberPhotometry"]["signal_amp"]), # TODO: Fix this in ndx-photometry timestamps=H5DataIO(timestamps, compression=True), unit="volts", ) @@ -144,7 +144,7 @@ def add_to_nwbfile(self, nwbfile: NWBFile, metadata: dict) -> None: ), data=H5DataIO(commanded_reference, compression=True), frequency=metadata["FiberPhotometry"]["reference_freq"], - power=metadata["FiberPhotometry"]["reference_amp"], # TODO: Fix this in ndx-photometry + power=float(metadata["FiberPhotometry"]["reference_amp"]), # TODO: Fix this in ndx-photometry timestamps=commanded_signal_series.timestamps, unit="volts", ) diff --git a/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/requirements.txt b/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/requirements.txt deleted file mode 100644 index 458b8a2..0000000 --- a/src/datta_lab_to_nwb/markowitz_gillis_nature_2023/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -nwb-conversion-tools==0.11.1 # Example of specific pinned dependecy -some-extra-package==1.11.3 # Example of another extra package that's necessary for the current conversion -roiextractors @ git+https://github.com/catalystneuro/roiextractors.git@8db5f9cb3a7ee5efee49b7fd0b694c7a8105519a # Github pinned dependency