Skip to content

Commit

Permalink
Merge pull request #55 from catalystneuro/run_processing_only
Browse files Browse the repository at this point in the history
Update to BWM
  • Loading branch information
CodyCBakerPhD authored Aug 2, 2023
2 parents f76efbe + 1f01953 commit 5e0041d
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ NWBFile:
IBL aims to understand the neural basis of decision-making in the mouse by gathering a whole-brain activity map composed of electrophysiological recordings pooled from multiple laboratories. We have systematically recorded from nearly all major brain areas with Neuropixels probes, using a grid system for unbiased sampling and replicating each recording site in at least two laboratories. These data have been used to construct a brain-wide map of activity at single-spike cellular resolution during a decision-making task. In addition to the map, this data set contains other information gathered during the task: sensory stimuli presented to the mouse; mouse decisions and response times; and mouse pose information from video recordings and DeepLabCut analysis.
session_description: |
The full description of the session/task protocol can be found in Appendix 2 of International Brain Laboratory, et al. "Standardized and reproducible measurement of decision-making in mice." Elife 10 (2021); e63711.
related_publications: "https://doi.org/10.6084/m9.figshare.21400815.v6, https://doi.org/10.1101/2020.01.17.909838" # Brain Wide Map white paper and behavior task protocol information
related_publications:
- "https://doi.org/10.6084/m9.figshare.21400815.v6"
- "https://doi.org/10.1101/2020.01.17.909838" # Brain Wide Map white paper and behavior task protocol information
Subject:
description: |
Mice were housed under a 12/12 h light/dark cycle (normal or inverted depending on the laboratory) with food and water 112 available ad libitum, except during behavioural training days. Electrophysiological recordings and behavioural training were performed during either the dark or light phase of the subject cycle depending on the laboratory. Subjects were obtained from either the Jackson Laboratory or Charles River.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import os

os.environ["JUPYTER_PLATFORM_DIRS"] = "1" # Annoying

import os
import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from shutil import rmtree
from tempfile import mkdtemp

from dandi.download import download as dandi_download
from dandi.organize import organize as dandi_organize
from dandi.upload import upload as dandi_upload
from neuroconv.tools.data_transfers import automatic_dandi_upload
from nwbinspector.tools import get_s3_urls_and_dandi_paths
from one.api import ONE
from pynwb import NWBHDF5IO
from pynwb.image import ImageSeries
from tqdm import tqdm

from ibl_to_nwb.updated_conversion.brainwide_map import BrainwideMapConverter
from ibl_to_nwb.updated_conversion.brainwide_map.datainterfaces import (
BrainwideMapTrialsInterface,
)
from ibl_to_nwb.updated_conversion.datainterfaces import (
IblPoseEstimationInterface,
IblSortingInterface,
IblStreamingApInterface,
IblStreamingLfInterface,
LickInterface,
PupilTrackingInterface,
RoiMotionEnergyInterface,
WheelInterface,
)


def convert_and_upload_parallel_processed_only(
base_path: Path,
session: str,
nwbfile_path: str,
stub_test: bool = False,
progress_position: int = 0,
cleanup: bool = False,
files_mode: str = "move",
):
try:
assert len(os.environ.get("DANDI_API_KEY", "")) > 0, "Run `export DANDI_API_KEY=...`!"

nwbfile_path.parent.mkdir(exist_ok=True)

# Download behavior and spike sorted data for this session
session_path = base_path / "ibl_conversion" / session
cache_folder = base_path / "ibl_conversion" / session / "cache"
session_one = ONE(
base_url="https://openalyx.internationalbrainlab.org",
password="international",
silent=True,
cache_dir=cache_folder,
)

# Initialize as many of each interface as we need across the streams
data_interfaces = list()

# These interfaces should always be present in source data
data_interfaces.append(IblSortingInterface(session=session, cache_folder=cache_folder / "sorting"))
data_interfaces.append(BrainwideMapTrialsInterface(one=session_one, session=session))
data_interfaces.append(WheelInterface(one=session_one, session=session))

# These interfaces may not be present; check if they are before adding to list
pose_estimation_files = session_one.list_datasets(eid=session, filename="*.dlc*")
for pose_estimation_file in pose_estimation_files:
camera_name = pose_estimation_file.replace("alf/_ibl_", "").replace(".dlc.pqt", "")
data_interfaces.append(
IblPoseEstimationInterface(
one=session_one, session=session, camera_name=camera_name, include_video=False
)
)

pupil_tracking_files = session_one.list_datasets(eid=session, filename="*features*")
for pupil_tracking_file in pupil_tracking_files:
camera_name = pupil_tracking_file.replace("alf/_ibl_", "").replace(".features.pqt", "")
data_interfaces.append(PupilTrackingInterface(one=session_one, session=session, camera_name=camera_name))

roi_motion_energy_files = session_one.list_datasets(eid=session, filename="*ROIMotionEnergy.npy*")
for roi_motion_energy_file in roi_motion_energy_files:
camera_name = roi_motion_energy_file.replace("alf/", "").replace(".ROIMotionEnergy.npy", "")
data_interfaces.append(RoiMotionEnergyInterface(one=session_one, session=session, camera_name=camera_name))

if session_one.list_datasets(eid=session, collection="alf", filename="licks*"):
data_interfaces.append(LickInterface(one=session_one, session=session))

# Run conversion
session_converter = BrainwideMapConverter(
one=session_one, session=session, data_interfaces=data_interfaces, verbose=False
)

metadata = session_converter.get_metadata()
metadata["NWBFile"]["session_id"] = metadata["NWBFile"]["session_id"] + "-processed-only"

session_converter.run_conversion(
nwbfile_path=nwbfile_path,
metadata=metadata,
overwrite=True,
)
automatic_dandi_upload(
dandiset_id="000409",
nwb_folder_path=nwbfile_path.parent,
cleanup=cleanup, # files_mode=files_mode
)
if cleanup:
rmtree(cache_folder)
rmtree(nwbfile_path.parent)

return 1
except Exception as exception:
error_file_path = base_path / "errors" / "7-30-23" / f"{session}_error.txt"
error_file_path.parent.mkdir(exist_ok=True)
with open(file=error_file_path, mode="w") as file:
file.write(f"{type(exception)}: {str(exception)}\n{traceback.format_exc()}")
return 0


number_of_parallel_jobs = 8
base_path = Path("/home/jovyan/IBL") # prototype on DANDI Hub for now

session_retrieval_one = ONE(
base_url="https://openalyx.internationalbrainlab.org", password="international", silent=True
)
brain_wide_sessions = [
session_info["id"]
for session_info in session_retrieval_one.alyx.rest(url="sessions", action="list", tag="2022_Q4_IBL_et_al_BWM")
]

# Already written sessions
dandi_file_paths = list(get_s3_urls_and_dandi_paths(dandiset_id="000409").values())
dandi_processed_file_paths = [dandi_file_path for dandi_file_path in dandi_file_paths if "processed" in dandi_file_path]
already_written_processed_sessions = [
processed_dandi_file_path.split("ses-")[1].split("_")[0].strip("-processed-only")
for processed_dandi_file_path in dandi_processed_file_paths
]
sessions_to_run = list(set(brain_wide_sessions) - set(already_written_processed_sessions))

with ProcessPoolExecutor(max_workers=number_of_parallel_jobs) as executor:
with tqdm(total=len(sessions_to_run), position=0, desc="Converting sessions...") as main_progress_bar:
futures = []
for progress_position, session in enumerate(sessions_to_run):
nwbfile_path = base_path / "nwbfiles" / session / f"{session}.nwb"
nwbfile_path.parent.mkdir(exist_ok=True)
futures.append(
executor.submit(
convert_and_upload_parallel_processed_only,
base_path=base_path,
session=session,
nwbfile_path=nwbfile_path,
progress_position=1 + progress_position,
# stub_test=True,
# files_mode="copy", # useful when debugging
# cleanup=True, # causing shutil error ATM
)
)
for future in as_completed(futures):
status = future.result()
main_progress_bar.update(1)
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_timestamps(self):
def align_timestamps(self):
pass

def run_conversion(self, nwbfile: NWBFile, metadata: dict):
def add_to_nwbfile(self, nwbfile: NWBFile, metadata: dict):
trials = self.one.load_object(id=self.session, obj="trials", collection="alf")

column_ordering = [
Expand Down Expand Up @@ -73,6 +73,7 @@ def run_conversion(self, nwbfile: NWBFile, metadata: dict):
)
)

# compression only works using the method above; method below fails
# for start_time, stop_time in trials["intervals"]:
# nwbfile.add_trial(start_time=start_time, stop_time=stop_time)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@


class IblPoseEstimationInterface(BaseDataInterface):
def __init__(self, one: ONE, session: str, camera_name: str):
def __init__(self, one: ONE, session: str, camera_name: str, include_video: bool):
self.one = one
self.session = session
self.camera_name = camera_name
self.include_video = include_video

def get_original_timestamps(self):
pass
Expand All @@ -24,7 +25,7 @@ def get_timestamps(self):
def align_timestamps(self):
pass

def run_conversion(self, nwbfile, metadata: dict):
def add_to_nwbfile(self, nwbfile, metadata: dict):
# Sometimes the DLC data has been revised, possibly multiple times
# Always use the most recent revision available
session_files = self.one.list_datasets(eid=self.session, filename=f"*{self.camera_name}.dlc*")
Expand Down Expand Up @@ -76,7 +77,9 @@ def run_conversion(self, nwbfile, metadata: dict):
source_software="DeepLabCut",
nodes=body_parts,
)
if self.one.list_datasets(eid=self.session, filename=f"raw_video_data/*{self.camera_name}*"):
if self.include_video and self.one.list_datasets(
eid=self.session, filename=f"raw_video_data/*{self.camera_name}*"
):
original_video_file = self.one.load_dataset(
id=self.session, dataset=f"raw_video_data/*{self.camera_name}*", download_only=True
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ def __init__(self, session: str, cache_folder: Optional[DirectoryPath] = None):
cluster_ids.extend(list(np.array(clusters["metrics"]["cluster_id"]) + unit_id_per_probe_shift))
number_of_units = len(np.unique(spikes["clusters"]))

print(f"{spikes['clusters']}=")
print(f"{len(set(spikes['clusters']))}=")
print(f"{clusters['metrics']['cluster_id']}=")
print(f"{clusters['metrics']}=")

# TODO - compare speed against iterating over unique cluster IDs + vector index search
for spike_cluster, spike_times, spike_amplitudes, spike_depths in zip(
spikes["clusters"], spikes["times"], spikes["amps"], spikes["depths"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def get_metadata(self) -> dict:

return metadata

def run_conversion(self, **kwargs):
def add_to_nwbfile(self, **kwargs):
# The buffer and chunk shapes must be set explicitly for good performance with the streaming
# Otherwise, the default buffer/chunk shapes might re-request the same data packet multiple times
chunk_frames = 100 if kwargs.get("stub_test", False) else 30_000
Expand All @@ -156,7 +156,7 @@ def run_conversion(self, **kwargs):
if "progress_position" in kwargs:
kwargs.pop("progress_position")
kwargs.update(es_key=self.es_key)
super().run_conversion(**kwargs)
super().add_to_nwbfile(**kwargs)


class IblStreamingLfInterface(IblStreamingApInterface):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get_timestamps(self):
def align_timestamps(self):
pass

def run_conversion(self, nwbfile, metadata: dict):
def add_to_nwbfile(self, nwbfile, metadata: dict):
licks = self.one.load_object(id=self.session, obj="licks", collection="alf")

lick_events_table = DynamicTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_timestamps(self):
def align_timestamps(self):
pass

def run_conversion(self, nwbfile, metadata: dict):
def add_to_nwbfile(self, nwbfile, metadata: dict):
left_or_right = self.camera_name[:5].rstrip("C")

camera_data = self.one.load_object(id=self.session, obj=self.camera_name, collection="alf")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def get_timestamps(self):
def align_timestamps(self):
pass

def run_conversion(self, nwbfile, metadata: dict):
def add_to_nwbfile(self, nwbfile, metadata: dict):
left_right_or_body = self.camera_name[:5].rstrip("C")

camera_data = self.one.load_object(id=self.session, obj=self.camera_name, collection="alf")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_timestamps(self):
def align_timestamps(self):
pass

def run_conversion(self, nwbfile, metadata: dict):
def add_to_nwbfile(self, nwbfile, metadata: dict):
wheel_moves = self.one.load_object(id=self.session, obj="wheelMoves", collection="alf")
wheel = self.one.load_object(id=self.session, obj="wheel", collection="alf")

Expand All @@ -40,9 +40,7 @@ def run_conversion(self, nwbfile, metadata: dict):
interpolated_position, interpolated_timestamps = wheel_methods.interpolate_position(
re_ts=wheel["timestamps"], re_pos=wheel["position"], freq=interpolation_frequency
)
velocity, acceleration = wheel_methods.velocity_filtered(
pos=interpolated_position, freq=interpolation_frequency
)
velocity, acceleration = wheel_methods.velocity_filtered(pos=interpolated_position, fs=interpolation_frequency)

# Deterministically regular
interpolated_starting_time = interpolated_timestamps[0]
Expand Down
11 changes: 4 additions & 7 deletions ibl_to_nwb/updated_conversion/iblconverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,8 @@ def run_conversion(
metadata = self.get_metadata()
self.validate_metadata(metadata=metadata)

if conversion_options is None:
conversion_options = dict()
default_conversion_options = self.get_conversion_options()
conversion_options_to_run = dict_deep_update(default_conversion_options, conversion_options)
self.validate_conversion_options(conversion_options=conversion_options_to_run)
conversion_options = conversion_options or dict()
self.validate_conversion_options(conversion_options=conversion_options)

with make_or_load_nwbfile(
nwbfile_path=nwbfile_path,
Expand All @@ -135,8 +132,8 @@ def run_conversion(
) as nwbfile_out:
nwbfile_out.subject = ibl_subject
for interface_name, data_interface in self.data_interface_objects.items():
data_interface.run_conversion(
nwbfile=nwbfile_out, metadata=metadata, **conversion_options_to_run.get(interface_name, dict())
data_interface.add_to_nwbfile(
nwbfile=nwbfile_out, metadata=metadata, **conversion_options.get(interface_name, dict())
)

return nwbfile_out
6 changes: 3 additions & 3 deletions ibl_to_nwb/updated_conversion/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
neuroconv[spikeglx]==0.2.4
spikeinterface>=0.97.0 # Actually need to use the streaming IBL fix for maximum efficiency
neuroconv[spikeglx] @ git+https://github.com/catalystneuro/neuroconv.git@fix_for_ibl
spikeinterface>=0.98.2 # Actually need to use the streaming IBL fix for maximum efficiency
ONE-api>=1.16.3
ibllib>=2.21.0
ndx-pose>=0.1.1
ndx-ibl==0.1.0
probeinterface==0.2.15 # v0.2.16 causes issue in this environment only (likely combo of past numpy and Python 3.8)
#probeinterface==0.2.15 # v0.2.16 causes issue in DANDI Hub environment only (likely combo of past numpy and Python 3.8)

0 comments on commit 5e0041d

Please sign in to comment.