Skip to content

Commit

Permalink
debugs
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Dec 6, 2023
1 parent c17c549 commit 3a76dcf
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 62 deletions.
4 changes: 1 addition & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
dandi==0.58.0
dandi
hdmf==3.6.1
colorcet
cellpose
Expand All @@ -8,6 +8,4 @@ ndx-photometry @ git+https://github.com/catalystneuro/ndx-photometry.git@7ea9d75
ndx-depth-moseq @ git+https://github.com/catalystneuro/ndx-depth-moseq.git@main
pyarrow
neuroconv
nwbwidgets
nwbinspector
pre-commit
158 changes: 114 additions & 44 deletions src/datta_lab_to_nwb/markowitz_gillis_nature_2023/convert_dataset.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import traceback
import json
from pathlib import Path
from typing import Union
from concurrent.futures import ProcessPoolExecutor, as_completed

import pandas as pd
from neuroconv.utils import load_dict_from_file
from tqdm import tqdm
import pandas as pd
import yaml
from datta_lab_to_nwb.markowitz_gillis_nature_2023.convert_session import session_to_nwb
import shutil

from datta_lab_to_nwb.markowitz_gillis_nature_2023.convert_session import _safe_session_to_nwb

folder_name_to_experiment_type = {
"_aggregate_results_arhmm_03": "reinforcement",
Expand All @@ -32,73 +35,131 @@
}


def dataset_to_nwb(
def get_all_processed_uuids(
*,
processed_path: Union[str, Path],
raw_dir_path: Union[str, Path],
output_dir_path: Union[str, Path],
skip_sessions: set,
num_sessions: int = None,
):
) -> set:
processed_path = Path(processed_path)
raw_dir_path = Path(raw_dir_path)
output_dir_path = Path(output_dir_path)
output_dir_path.mkdir(parents=True, exist_ok=True)

uuid_file_path = output_dir_path.parent / "all_processed_uuids.txt"

if uuid_file_path.exists():
with open(file=uuid_file_path, mode="r") as io:
all_processed_uuids = set(json.load(fp=io))
return all_processed_uuids

photometry_uuids = pd.read_parquet(
processed_path / "dlight_raw_data/dlight_photometry_processed_full.parquet", columns=["uuid"]
)
photometry_uuids = set(photometry_uuids["uuid"])
unique_photometry_uuids = set(photometry_uuids["uuid"])
del photometry_uuids

reinforcement_uuids = pd.read_parquet(
processed_path / "optoda_raw_data/closed_loop_behavior.parquet", columns=["uuid"]
)
reinforcement_uuids = set(reinforcement_uuids["uuid"])
unique_reinforcement_uuids = set(reinforcement_uuids["uuid"])
del reinforcement_uuids

velocity_uuids = pd.read_parquet(
processed_path / "optoda_raw_data/closed_loop_behavior_velocity_conditioned.parquet", columns=["uuid"]
)
velocity_uuids = set(velocity_uuids["uuid"])
all_processed_uuids = photometry_uuids.union(reinforcement_uuids).union(velocity_uuids)
unique_velocity_uuids = set(velocity_uuids["uuid"])
del velocity_uuids

all_processed_uuids = unique_photometry_uuids | unique_reinforcement_uuids | unique_velocity_uuids

with open(file=uuid_file_path, mode="w") as io:
json.dump(obj=list(all_processed_uuids), fp=io)
return all_processed_uuids


def dataset_to_nwb(
*,
processed_path: Union[str, Path],
raw_dir_path: Union[str, Path],
output_dir_path: Union[str, Path],
skip_sessions: set,
number_of_jobs: int,
num_sessions_per_experiment: int = None,
):
processed_path = Path(processed_path)
raw_dir_path = Path(raw_dir_path)
output_dir_path = Path(output_dir_path)

log_folder_path = output_dir_path.parent / "logs"
log_folder_path.mkdir(exist_ok=True)

missing_folder_path = output_dir_path.parent / "missing"
missing_folder_path.mkdir(exist_ok=True)

all_processed_uuids = get_all_processed_uuids(processed_path=processed_path, output_dir_path=output_dir_path)

experimental_folders = [
folder for folder in raw_dir_path.iterdir() if folder.is_dir() and folder.name not in skip_experiments
folder
for folder in raw_dir_path.iterdir()
if folder.is_dir() and folder.name not in skip_experiments and folder.name.startswith("_")
]
for experimental_folder in tqdm(experimental_folders):
for experimental_folder in tqdm(iterable=experimental_folders, position=0, description="Converting experiments..."):
experiment_type = folder_name_to_experiment_type[experimental_folder.name]
session_folders = [
folder for folder in experimental_folder.iterdir() if folder.is_dir() and folder.name not in skip_sessions
]
if num_sessions is None:
num_sessions = len(session_folders) + 1
if num_sessions_per_experiment is None:
num_sessions_per_experiment = len(session_folders) + 1
session_num = 0
for session_folder in session_folders:
print(f"Processing {session_folder.name}")
results_file = session_folder / "proc" / "results_00.yaml"
results = load_dict_from_file(results_file)
session_uuid = results["uuid"]
if session_uuid not in all_processed_uuids:
continue
session_to_nwb(
session_uuid=session_uuid,
experiment_type=experiment_type,
processed_path=processed_path,
raw_path=session_folder,
output_dir_path=output_dir_path,

futures = list()
with ProcessPoolExecutor(max_workers=number_of_jobs) as executor:
for session_folder in session_folders:
error_identifier_base = f"{experimental_folder.name}_{session_folder.name}"

results_file = session_folder / "proc" / "results_00.yaml"

if not results_file.exists():
(missing_folder_path / f"{error_identifier_base}.txt").touch()
continue

results = load_dict_from_file(results_file)
session_uuid = results["uuid"]
if session_uuid not in all_processed_uuids:
continue

futures.append(
executor.submit(
_safe_session_to_nwb,
session_uuid=session_uuid,
experiment_type=experiment_type,
processed_path=processed_path,
raw_path=session_folder,
output_dir_path=output_dir_path,
log_file_path=log_folder_path / f"{error_identifier_base}_{session_uuid}.txt",
),
)

session_num += 1
if session_num >= num_sessions_per_experiment:
break

parallel_iterable = tqdm(
iterable=as_completed(futures), position=1, description="Converting sessionsin parallel..."
)
session_num += 1
if session_num >= num_sessions:
break
for _ in parallel_iterable:
pass


if __name__ == "__main__":
processed_path = Path("NWB/DattaConv/processed_data")
raw_dir_path = Path("NWB/DattaConv/raw_data")
output_dir_path = Path("NWB/DattaConv/conversion_output")
if output_dir_path.exists():
shutil.rmtree(
output_dir_path, ignore_errors=True
) # ignore errors due to MacOS race condition (https://github.com/python/cpython/issues/81441)
number_of_jobs = 4

processed_path = Path("E:/Datta/dopamine-reinforces-spontaneous-behavior")
raw_dir_path = Path("E:/Datta")
output_dir_path = Path("E:/datta_output/files")

skip_experiments = {
"keypoint", # no proc folder for keypoints
}
skip_sessions = {
temporary_skip_sessions = {
"session_20210420113646-974717", # _aggregate_results_arhmm_photometry_excitation_pulsed_01: missing everything except depth video
"session_20210309134748-687283", # _aggregate_results_arhmm_excitation_03: missing everything except depth video
"session_20210224083612-947426", # _aggregate_results_arhmm_excitation_03: missing proc folder
Expand All @@ -120,5 +181,14 @@ def dataset_to_nwb(
"session_20210429135801-758690", # _aggregate_results_arhmm_09: missing everything except depth video
"session_20191111130454-333065", # _aggregate_results_arhmm_05: missing proc folder
"session_20191111130847-263894", # _aggregate_results_arhmm_05: missing proc folder
"session_20200720110309-817092",
"session_20210115130943-880998",
}
dataset_to_nwb(processed_path, raw_dir_path, output_dir_path, skip_sessions, num_sessions=1)
dataset_to_nwb(
processed_path=processed_path,
raw_dir_path=raw_dir_path,
output_dir_path=output_dir_path,
skip_sessions=temporary_skip_sessions,
number_of_jobs=number_of_jobs,
num_sessions_per_experiment=1,
)
Original file line number Diff line number Diff line change
@@ -1,18 +1,42 @@
"""Primary script to run to convert an entire session for of data using the NWBConverter."""
# Standard Library
from pathlib import Path
import shutil
import traceback
from pathlib import Path
from typing import Union, Literal

# Third Party
from neuroconv.utils import dict_deep_update, load_dict_from_file
from pynwb import NWBHDF5IO

# Local
from datta_lab_to_nwb.markowitz_gillis_nature_2023.postconversion import reproduce_fig1d
from datta_lab_to_nwb.markowitz_gillis_nature_2023.nwbconverter import DattaNWBConverter


def _safe_session_to_nwb(
*,
session_uuid: str,
processed_path: Union[str, Path],
raw_path: Union[str, Path],
output_dir_path: Union[str, Path],
experiment_type: Literal["reinforcement", "photometry", "reinforcement-photometry", "velocity-modulation"],
log_file_path: Path,
processed_only: bool = False,
stub_test: bool = False,
):
try:
session_to_nwb(
session_uuid=session_uuid,
processed_path=processed_path,
raw_path=raw_path,
output_dir_path=output_dir_path,
experiment_type=experiment_type,
processed_only=processed_only,
stub_test=stub_test,
)
except Exception as exception:
with open(file=log_file_path, mode="w") as io:
io.write(f"{type(exception)}: {str(exception)}\n\n{traceback.format_exc()}")


def session_to_nwb(
session_uuid: str,
processed_path: Union[str, Path],
Expand Down Expand Up @@ -139,7 +163,9 @@ def session_to_nwb(
metadata = dict_deep_update(metadata, paper_metadata)

# Run conversion
converter.run_conversion(metadata=metadata, nwbfile_path=nwbfile_path, conversion_options=conversion_options)
converter.run_conversion(
metadata=metadata, nwbfile_path=nwbfile_path, conversion_options=conversion_options, overwrite=True
)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ def add_to_nwbfile(self, nwbfile: NWBFile, metadata: dict, velocity_modulation:
],
)

# Reconstruct optogenetic series from feedback status
if pd.isnull(metadata["Optogenetics"]["stim_frequency_Hz"]): # cts stim
data, timestamps = self.reconstruct_cts_stim(metadata, session_df, session_timestamps)
else: # pulsed stim
data, timestamps = self.reconstruct_pulsed_stim(metadata, session_df, session_timestamps)

device = nwbfile.create_device(
name="OptoEngineMRL",
description="Optogenetic stimulator (Opto Engine MRL-III-635; SKU: RD-635-00500-CWM-SD-03-LED-0)",
Expand All @@ -127,11 +133,6 @@ def add_to_nwbfile(self, nwbfile: NWBFile, metadata: dict, velocity_modulation:
excitation_lambda=635.0,
location=metadata["Optogenetics"]["area"],
)
# Reconstruct optogenetic series from feedback status
if pd.isnull(metadata["Optogenetics"]["stim_frequency_Hz"]): # cts stim
data, timestamps = self.reconstruct_cts_stim(metadata, session_df, session_timestamps)
else: # pulsed stim
data, timestamps = self.reconstruct_pulsed_stim(metadata, session_df, session_timestamps)
id2sorted_index = metadata["BehavioralSyllable"]["id2sorted_index"]
target_syllables = [id2sorted_index[syllable_id] for syllable_id in metadata["Optogenetics"]["target_syllable"]]
ogen_series = OptogeneticSeries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def add_to_nwbfile(self, nwbfile: NWBFile, metadata: dict) -> None:
),
data=H5DataIO(commanded_signal, compression=True),
frequency=metadata["FiberPhotometry"]["signal_freq"],
power=metadata["FiberPhotometry"]["signal_amp"], # TODO: Fix this in ndx-photometry
power=float(metadata["FiberPhotometry"]["signal_amp"]), # TODO: Fix this in ndx-photometry
timestamps=H5DataIO(timestamps, compression=True),
unit="volts",
)
Expand All @@ -144,7 +144,7 @@ def add_to_nwbfile(self, nwbfile: NWBFile, metadata: dict) -> None:
),
data=H5DataIO(commanded_reference, compression=True),
frequency=metadata["FiberPhotometry"]["reference_freq"],
power=metadata["FiberPhotometry"]["reference_amp"], # TODO: Fix this in ndx-photometry
power=float(metadata["FiberPhotometry"]["reference_amp"]), # TODO: Fix this in ndx-photometry
timestamps=commanded_signal_series.timestamps,
unit="volts",
)
Expand Down

This file was deleted.

0 comments on commit 3a76dcf

Please sign in to comment.