diff --git a/.github/workflows/deploy-tests.yml b/.github/workflows/deploy-tests.yml index a18fe8310..a2e56b00a 100644 --- a/.github/workflows/deploy-tests.yml +++ b/.github/workflows/deploy-tests.yml @@ -69,6 +69,9 @@ jobs: if: ${{ needs.assess-file-changes.outputs.SOURCE_CHANGED == 'true' }} uses: ./.github/workflows/live-service-testing.yml secrets: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + S3_GIN_BUCKET: ${{ secrets.S3_GIN_BUCKET }} DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }} with: # Ternary operator: condition && value_if_true || value_if_false python-versions: ${{ github.event.pull_request.draft == true && '["3.9"]' || needs.load_python_and_os_versions.outputs.ALL_PYTHON_VERSIONS }} diff --git a/.github/workflows/live-service-testing.yml b/.github/workflows/live-service-testing.yml index 24eda7bc3..155438fb2 100644 --- a/.github/workflows/live-service-testing.yml +++ b/.github/workflows/live-service-testing.yml @@ -13,6 +13,12 @@ on: type: string secrets: + AWS_ACCESS_KEY_ID: + required: true + AWS_SECRET_ACCESS_KEY: + required: true + S3_GIN_BUCKET: + required: true DANDI_API_KEY: required: true @@ -45,7 +51,17 @@ jobs: - name: Install full requirements run: pip install .[test,full] + - name: Prepare data for tests + uses: ./.github/actions/load-data + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + s3-gin-bucket: ${{ secrets.S3_GIN_BUCKET }} + os: ${{ matrix.os }} + - name: Run subset of tests that use DANDI live services run: pytest -rsx -n auto tests/test_minimal/test_tools/dandi_transfer_tools.py + - name: Run subset of tests that use DANDI live services with YAML + run: pytest -rsx -n auto tests/test_on_data/test_yaml/yaml_dandi_transfer_tools.py - name: Run subset of tests that use Globus live services run: pytest -rsx -n auto tests/test_minimal/test_tools/globus_transfer_tools.py diff --git a/.github/workflows/neuroconv_deployment_aws_tests.yml b/.github/workflows/neuroconv_deployment_aws_tests.yml new file mode 100644 index 000000000..64aae5ec9 --- /dev/null +++ b/.github/workflows/neuroconv_deployment_aws_tests.yml @@ -0,0 +1,46 @@ +name: NeuroConv Deployment AWS Tests +on: + schedule: + - cron: "0 16 * * 3" # Weekly at noon on Wednesday + workflow_dispatch: + +concurrency: # Cancel previous workflows on the same pull request + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + RCLONE_DRIVE_ACCESS_TOKEN: ${{ secrets.RCLONE_DRIVE_ACCESS_TOKEN }} + RCLONE_DRIVE_REFRESH_TOKEN: ${{ secrets.RCLONE_DRIVE_REFRESH_TOKEN }} + RCLONE_EXPIRY_TOKEN: ${{ secrets.RCLONE_EXPIRY_TOKEN }} + DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }} + +jobs: + run: + name: ${{ matrix.os }} Python ${{ matrix.python-version }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + python-version: ["3.12"] + os: [ubuntu-latest] + steps: + - uses: actions/checkout@v4 + - run: git fetch --prune --unshallow --tags + - name: Setup Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Global Setup + run: | + python -m pip install -U pip # Official recommended way + git config --global user.email "CI@example.com" + git config --global user.name "CI Almighty" + + - name: Install AWS requirements + run: pip install .[aws,test] + + - name: Run NeuroConv Deployment on AWS tests + run: pytest -rsx -n auto tests/test_on_data/test_yaml/neuroconv_deployment_aws_tools_tests.py diff --git a/CHANGELOG.md b/CHANGELOG.md index a37093e39..11dce4e7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,35 +1,43 @@ -# Upcoming - -## Features -* Added the `rclone_transfer_batch_job` helper function for executing Rclone data transfers in AWS Batch jobs. [PR #1085](https://github.com/catalystneuro/neuroconv/pull/1085) - - - -## v0.6.4 +# v0.6.6 (Upcoming) ## Deprecations -* Completely removed compression settings from most places [PR #1126](https://github.com/catalystneuro/neuroconv/pull/1126) +* Removed use of `jsonschema.RefResolver` as it will be deprecated from the jsonschema library [PR #1133](https://github.com/catalystneuro/neuroconv/pull/1133) +* Completely removed compression settings from most places[PR #1126](https://github.com/catalystneuro/neuroconv/pull/1126) ## Bug Fixes * datetime objects now can be validated as conversion options [#1139](https://github.com/catalystneuro/neuroconv/pull/1126) +* Make `NWBMetaDataEncoder` public again [PR #1142](https://github.com/catalystneuro/neuroconv/pull/1142) * Fix a bug where data in `DeepLabCutInterface` failed to write when `ndx-pose` was not imported. [#1144](https://github.com/catalystneuro/neuroconv/pull/1144) +* `SpikeGLXConverterPipe` converter now accepts multi-probe structures with multi-trigger and does not assume a specific folder structure [#1150](https://github.com/catalystneuro/neuroconv/pull/1150) +* `SpikeGLXNIDQInterface` is no longer written as an ElectricalSeries [#1152](https://github.com/catalystneuro/neuroconv/pull/1152) + ## Features * Propagate the `unit_electrode_indices` argument from the spikeinterface tools to `BaseSortingExtractorInterface`. This allows users to map units to the electrode table when adding sorting data [PR #1124](https://github.com/catalystneuro/neuroconv/pull/1124) * Imaging interfaces have a new conversion option `always_write_timestamps` that can be used to force writing timestamps even if neuroconv's heuristics indicates regular sampling rate [PR #1125](https://github.com/catalystneuro/neuroconv/pull/1125) * Added .csv support to DeepLabCutInterface [PR #1140](https://github.com/catalystneuro/neuroconv/pull/1140) +* `SpikeGLXRecordingInterface` now also accepts `folder_path` making its behavior equivalent to SpikeInterface [#1150](https://github.com/catalystneuro/neuroconv/pull/1150) +* Added the `rclone_transfer_batch_job` helper function for executing Rclone data transfers in AWS Batch jobs. [PR #1085](https://github.com/catalystneuro/neuroconv/pull/1085) +* Added the `deploy_neuroconv_batch_job` helper function for deploying NeuroConv AWS Batch jobs. [PR #1086](https://github.com/catalystneuro/neuroconv/pull/1086) +* YAML specification files now accepts an outer keyword `upload_to_dandiset="< six-digit ID >"` to automatically upload the produced NWB files to the DANDI archive [PR #1089](https://github.com/catalystneuro/neuroconv/pull/1089) +*`SpikeGLXNIDQInterface` now handdles digital demuxed channels (`XD0`) [#1152](https://github.com/catalystneuro/neuroconv/pull/1152) + + + ## Improvements * Use mixing tests for ecephy's mocks [PR #1136](https://github.com/catalystneuro/neuroconv/pull/1136) +* Use pytest format for dandi tests to avoid window permission error on teardown [PR #1151](https://github.com/catalystneuro/neuroconv/pull/1151) +* Added many docstrings for public functions [PR #1063](https://github.com/catalystneuro/neuroconv/pull/1063) # v0.6.5 (November 1, 2024) -## Deprecations - ## Bug Fixes * Fixed formatwise installation from pipy [PR #1118](https://github.com/catalystneuro/neuroconv/pull/1118) * Fixed dailies [PR #1113](https://github.com/catalystneuro/neuroconv/pull/1113) +## Deprecations + ## Features * Using in-house `GenericDataChunkIterator` [PR #1068](https://github.com/catalystneuro/neuroconv/pull/1068) * Data interfaces now perform source (argument inputs) validation with the json schema [PR #1020](https://github.com/catalystneuro/neuroconv/pull/1020) diff --git a/docs/api/utils.rst b/docs/api/utils.rst index 4f19f7cee..c9b85b14c 100644 --- a/docs/api/utils.rst +++ b/docs/api/utils.rst @@ -8,6 +8,8 @@ Dictionaries JSON Schema ----------- .. automodule:: neuroconv.utils.json_schema + :members: + :exclude-members: NWBMetaDataEncoder Common Reused Types ------------------- diff --git a/docs/conversion_examples_gallery/recording/spikeglx.rst b/docs/conversion_examples_gallery/recording/spikeglx.rst index 7f57470af..97b23bac9 100644 --- a/docs/conversion_examples_gallery/recording/spikeglx.rst +++ b/docs/conversion_examples_gallery/recording/spikeglx.rst @@ -24,7 +24,7 @@ We can easily convert all data stored in the native SpikeGLX folder structure to >>> >>> folder_path = f"{ECEPHY_DATA_PATH}/spikeglx/Noise4Sam_g0" >>> converter = SpikeGLXConverterPipe(folder_path=folder_path) - >>> + Source data is valid! >>> # Extract what metadata we can from the source files >>> metadata = converter.get_metadata() >>> # For data provenance we add the time zone information to the conversion diff --git a/pyproject.toml b/pyproject.toml index 5efd432f5..e318cc9c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,7 +50,8 @@ dependencies = [ "parse>=1.20.0", "click", "docstring-parser", - "packaging" # Issue 903 + "packaging", # Issue 903 + "referencing", ] @@ -355,7 +356,7 @@ doctest_optionflags = "ELLIPSIS" [tool.black] line-length = 120 -target-version = ['py38', 'py39', 'py310'] +target-version = ['py39', 'py310'] include = '\.pyi?$' extend-exclude = ''' /( diff --git a/src/neuroconv/basedatainterface.py b/src/neuroconv/basedatainterface.py index 272abbd0c..d9e9dc11e 100644 --- a/src/neuroconv/basedatainterface.py +++ b/src/neuroconv/basedatainterface.py @@ -19,12 +19,11 @@ ) from .tools.nwb_helpers._metadata_and_file_helpers import _resolve_backend from .utils import ( - _NWBMetaDataEncoder, get_json_schema_from_method_signature, load_dict_from_file, ) from .utils.dict import DeepDict -from .utils.json_schema import _NWBSourceDataEncoder +from .utils.json_schema import _NWBMetaDataEncoder, _NWBSourceDataEncoder class BaseDataInterface(ABC): @@ -37,7 +36,14 @@ class BaseDataInterface(ABC): @classmethod def get_source_schema(cls) -> dict: - """Infer the JSON schema for the source_data from the method signature (annotation typing).""" + """ + Infer the JSON schema for the source_data from the method signature (annotation typing). + + Returns + ------- + dict + The JSON schema for the source_data. + """ return get_json_schema_from_method_signature(cls, exclude=["source_data"]) @classmethod diff --git a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py index 505aa144d..62edaf140 100644 --- a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py +++ b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py @@ -111,6 +111,28 @@ def add_to_nwbfile( starting_frames_labeled_videos: Optional[list[int]] = None, stub_test: bool = False, ): + """ + Add behavior and pose estimation data, including original and labeled videos, to the specified NWBFile. + + Parameters + ---------- + nwbfile : NWBFile + The NWBFile object to which the data will be added. + metadata : dict + Metadata dictionary containing information about the behavior and videos. + reference_frame : str, optional + Description of the reference frame for pose estimation, by default None. + confidence_definition : str, optional + Definition for the confidence levels in pose estimation, by default None. + external_mode : bool, optional + If True, the videos will be referenced externally rather than embedded within the NWB file, by default True. + starting_frames_original_videos : list of int, optional + List of starting frames for the original videos, by default None. + starting_frames_labeled_videos : list of int, optional + List of starting frames for the labeled videos, by default None. + stub_test : bool, optional + If True, only a subset of the data will be added for testing purposes, by default False. + """ original_video_interface = self.data_interface_objects["OriginalVideo"] original_video_metadata = next( @@ -172,6 +194,33 @@ def run_conversion( starting_frames_labeled_videos: Optional[list] = None, stub_test: bool = False, ) -> None: + """ + Run the full conversion process, adding behavior, video, and pose estimation data to an NWB file. + + Parameters + ---------- + nwbfile_path : FilePath, optional + The file path where the NWB file will be saved. If None, the file is handled in memory. + nwbfile : NWBFile, optional + An in-memory NWBFile object. If None, a new NWBFile object will be created. + metadata : dict, optional + Metadata dictionary for describing the NWB file contents. If None, it is auto-generated. + overwrite : bool, optional + If True, overwrites the NWB file at `nwbfile_path` if it exists. If False, appends to the file, by default False. + reference_frame : str, optional + Description of the reference frame for pose estimation, by default None. + confidence_definition : str, optional + Definition for confidence levels in pose estimation, by default None. + external_mode : bool, optional + If True, the videos will be referenced externally rather than embedded within the NWB file, by default True. + starting_frames_original_videos : list of int, optional + List of starting frames for the original videos, by default None. + starting_frames_labeled_videos : list of int, optional + List of starting frames for the labeled videos, by default None. + stub_test : bool, optional + If True, only a subset of the data will be added for testing purposes, by default False. + + """ if metadata is None: metadata = self.get_metadata() diff --git a/src/neuroconv/datainterfaces/behavior/medpc/medpcdatainterface.py b/src/neuroconv/datainterfaces/behavior/medpc/medpcdatainterface.py index 09f9111d7..d519dc71a 100644 --- a/src/neuroconv/datainterfaces/behavior/medpc/medpcdatainterface.py +++ b/src/neuroconv/datainterfaces/behavior/medpc/medpcdatainterface.py @@ -187,6 +187,7 @@ def add_to_nwbfile( nwbfile: NWBFile, metadata: dict, ) -> None: + ndx_events = get_package(package_name="ndx_events", installation_instructions="pip install ndx-events") medpc_name_to_info_dict = metadata["MedPC"].get("medpc_name_to_info_dict", None) assert medpc_name_to_info_dict is not None, "medpc_name_to_info_dict must be provided in metadata" diff --git a/src/neuroconv/datainterfaces/behavior/neuralynx/neuralynx_nvt_interface.py b/src/neuroconv/datainterfaces/behavior/neuralynx/neuralynx_nvt_interface.py index e161387f0..213adf731 100644 --- a/src/neuroconv/datainterfaces/behavior/neuralynx/neuralynx_nvt_interface.py +++ b/src/neuroconv/datainterfaces/behavior/neuralynx/neuralynx_nvt_interface.py @@ -8,7 +8,8 @@ from .nvt_utils import read_data, read_header from ....basetemporalalignmentinterface import BaseTemporalAlignmentInterface -from ....utils import DeepDict, _NWBMetaDataEncoder, get_base_schema +from ....utils import DeepDict, get_base_schema +from ....utils.json_schema import _NWBMetaDataEncoder from ....utils.path import infer_path diff --git a/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py b/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py index dca2dea5f..8eeb59324 100644 --- a/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py +++ b/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py @@ -220,6 +220,19 @@ def set_aligned_segment_starting_times(self, aligned_segment_starting_times: lis sorting_segment._t_start = aligned_segment_starting_time def subset_sorting(self): + """ + Generate a subset of the sorting extractor based on spike timing data. + + This method identifies the earliest spike time across all units in the sorting extractor and creates a + subset of the sorting data up to 110% of the earliest spike time. If the sorting extractor is associated + with a recording, the subset is further limited by the total number of samples in the recording. + + Returns + ------- + SortingExtractor + A new `SortingExtractor` object representing the subset of the original sorting data, + sliced from the start frame to the calculated end frame. + """ max_min_spike_time = max( [ min(x) diff --git a/src/neuroconv/datainterfaces/ecephys/cellexplorer/cellexplorerdatainterface.py b/src/neuroconv/datainterfaces/ecephys/cellexplorer/cellexplorerdatainterface.py index 46e825fb5..9ffeb78a2 100644 --- a/src/neuroconv/datainterfaces/ecephys/cellexplorer/cellexplorerdatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/cellexplorer/cellexplorerdatainterface.py @@ -518,6 +518,33 @@ def __init__(self, file_path: FilePath, verbose: bool = True): ) def generate_recording_with_channel_metadata(self): + """ + Generate a dummy recording extractor with channel metadata from session data. + + This method reads session data from a `.session.mat` file (if available) and generates a dummy recording + extractor. The recording extractor is then populated with channel metadata extracted from the session file. + + Returns + ------- + NumpyRecording + A `NumpyRecording` object representing the dummy recording extractor, containing the channel metadata. + + Notes + ----- + - The method reads the `.session.mat` file using `pymatreader` and extracts `extracellular` data. + - It creates a dummy recording extractor using `spikeinterface.core.numpyextractors.NumpyRecording`. + - The generated extractor includes channel IDs and other relevant metadata such as number of channels, + number of samples, and sampling frequency. + - Channel metadata is added to the dummy extractor using the `add_channel_metadata_to_recoder` function. + - If the `.session.mat` file is not found, no extractor is returned. + + Warnings + -------- + Ensure that the `.session.mat` file is correctly located in the expected session path, or the method will not generate + a recording extractor. The expected session is self.session_path / f"{self.session_id}.session.mat" + + """ + session_data_file_path = self.session_path / f"{self.session_id}.session.mat" if session_data_file_path.is_file(): from pymatreader import read_mat diff --git a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py index 007c3177c..029955d24 100644 --- a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py +++ b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py @@ -29,8 +29,10 @@ def get_source_schema(cls): @classmethod def get_streams(cls, folder_path: DirectoryPath) -> list[str]: + "Return the stream ids available in the folder." from spikeinterface.extractors import SpikeGLXRecordingExtractor + # The first entry is the stream ids the second is the stream names return SpikeGLXRecordingExtractor.get_streams(folder_path=folder_path)[0] @validate_call @@ -61,28 +63,17 @@ def __init__( """ folder_path = Path(folder_path) - streams = streams or self.get_streams(folder_path=folder_path) + streams_ids = streams or self.get_streams(folder_path=folder_path) data_interfaces = dict() - for stream in streams: - if "ap" in stream: - probe_name = stream[:5] - file_path = ( - folder_path / f"{folder_path.stem}_{probe_name}" / f"{folder_path.stem}_t0.{probe_name}.ap.bin" - ) - es_key = f"ElectricalSeriesAP{probe_name.capitalize()}" - interface = SpikeGLXRecordingInterface(file_path=file_path, es_key=es_key) - elif "lf" in stream: - probe_name = stream[:5] - file_path = ( - folder_path / f"{folder_path.stem}_{probe_name}" / f"{folder_path.stem}_t0.{probe_name}.lf.bin" - ) - es_key = f"ElectricalSeriesLF{probe_name.capitalize()}" - interface = SpikeGLXRecordingInterface(file_path=file_path, es_key=es_key) - elif "nidq" in stream: - file_path = folder_path / f"{folder_path.stem}_t0.nidq.bin" - interface = SpikeGLXNIDQInterface(file_path=file_path) - data_interfaces.update({str(stream): interface}) # Without str() casting, is a numpy string + + nidq_streams = [stream_id for stream_id in streams_ids if stream_id == "nidq"] + electrical_streams = [stream_id for stream_id in streams_ids if stream_id not in nidq_streams] + for stream_id in electrical_streams: + data_interfaces[stream_id] = SpikeGLXRecordingInterface(folder_path=folder_path, stream_id=stream_id) + + for stream_id in nidq_streams: + data_interfaces[stream_id] = SpikeGLXNIDQInterface(folder_path=folder_path) super().__init__(data_interfaces=data_interfaces, verbose=verbose) diff --git a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxdatainterface.py b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxdatainterface.py index c15516431..e8b6a78c9 100644 --- a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxdatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxdatainterface.py @@ -4,7 +4,7 @@ from typing import Optional import numpy as np -from pydantic import FilePath, validate_call +from pydantic import DirectoryPath, FilePath, validate_call from .spikeglx_utils import ( add_recording_extractor_properties, @@ -45,7 +45,6 @@ def get_source_schema(cls) -> dict: def _source_data_to_extractor_kwargs(self, source_data: dict) -> dict: extractor_kwargs = source_data.copy() - extractor_kwargs.pop("file_path") extractor_kwargs["folder_path"] = self.folder_path extractor_kwargs["all_annotations"] = True extractor_kwargs["stream_id"] = self.stream_id @@ -54,38 +53,63 @@ def _source_data_to_extractor_kwargs(self, source_data: dict) -> dict: @validate_call def __init__( self, - file_path: FilePath, + file_path: Optional[FilePath] = None, verbose: bool = True, es_key: Optional[str] = None, + folder_path: Optional[DirectoryPath] = None, + stream_id: Optional[str] = None, ): """ Parameters ---------- + folder_path: DirectoryPath + Folder path containing the binary files of the SpikeGLX recording. + stream_id: str, optional + Stream ID of the SpikeGLX recording. + Examples are 'imec0.ap', 'imec0.lf', 'imec1.ap', 'imec1.lf', etc. file_path : FilePathType Path to .bin file. Point to .ap.bin for SpikeGLXRecordingInterface and .lf.bin for SpikeGLXLFPInterface. verbose : bool, default: True Whether to output verbose text. - es_key : str, default: "ElectricalSeries" + es_key : str, the key to access the metadata of the ElectricalSeries. """ - self.stream_id = fetch_stream_id_for_spikelgx_file(file_path) - if es_key is None: - if "lf" in self.stream_id: - es_key = "ElectricalSeriesLF" - elif "ap" in self.stream_id: - es_key = "ElectricalSeriesAP" - else: - raise ValueError("Cannot automatically determine es_key from path") - file_path = Path(file_path) - self.folder_path = file_path.parent + if stream_id == "nidq": + raise ValueError( + "SpikeGLXRecordingInterface is not designed to handle nidq files. Use SpikeGLXNIDQInterface instead" + ) + + if file_path is not None and stream_id is None: + self.stream_id = fetch_stream_id_for_spikelgx_file(file_path) + self.folder_path = Path(file_path).parent + else: + self.stream_id = stream_id + self.folder_path = Path(folder_path) super().__init__( - file_path=file_path, + folder_path=folder_path, verbose=verbose, es_key=es_key, ) - self.source_data["file_path"] = str(file_path) - self.meta = self.recording_extractor.neo_reader.signals_info_dict[(0, self.stream_id)]["meta"] + + signal_info_key = (0, self.stream_id) # Key format is (segment_index, stream_id) + self._signals_info_dict = self.recording_extractor.neo_reader.signals_info_dict[signal_info_key] + self.meta = self._signals_info_dict["meta"] + + if es_key is None: + stream_kind = self._signals_info_dict["stream_kind"] # ap or lf + stream_kind_caps = stream_kind.upper() + device = self._signals_info_dict["device"].capitalize() # imec0, imec1, etc. + + electrical_series_name = f"ElectricalSeries{stream_kind_caps}" + + # Add imec{probe_index} to the electrical series name when there are multiple probes + # or undefined, `typeImEnabled` is present in the meta of all the production probes + self.probes_enabled_in_run = int(self.meta.get("typeImEnabled", 0)) + if self.probes_enabled_in_run != 1: + electrical_series_name += f"{device}" + + self.es_key = electrical_series_name # Set electrodes properties add_recording_extractor_properties(self.recording_extractor) @@ -100,7 +124,7 @@ def get_metadata(self) -> dict: device = get_device_metadata(self.meta) # Should follow pattern 'Imec0', 'Imec1', etc. - probe_name = self.stream_id[:5].capitalize() + probe_name = self._signals_info_dict["device"].capitalize() device["name"] = f"Neuropixel{probe_name}" # Add groups metadata diff --git a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py index 3cf50080a..5249dfe39 100644 --- a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py +++ b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py @@ -1,46 +1,44 @@ +import warnings from pathlib import Path +from typing import Literal, Optional import numpy as np -from pydantic import ConfigDict, FilePath, validate_call +from pydantic import ConfigDict, DirectoryPath, FilePath, validate_call +from pynwb import NWBFile +from pynwb.base import TimeSeries from .spikeglx_utils import get_session_start_time -from ..baserecordingextractorinterface import BaseRecordingExtractorInterface +from ....basedatainterface import BaseDataInterface from ....tools.signal_processing import get_rising_frames_from_ttl -from ....utils import get_json_schema_from_method_signature +from ....tools.spikeinterface.spikeinterface import _recording_traces_to_hdmf_iterator +from ....utils import ( + calculate_regular_series_rate, + get_json_schema_from_method_signature, +) -class SpikeGLXNIDQInterface(BaseRecordingExtractorInterface): +class SpikeGLXNIDQInterface(BaseDataInterface): """Primary data interface class for converting the high-pass (ap) SpikeGLX format.""" display_name = "NIDQ Recording" - keywords = BaseRecordingExtractorInterface.keywords + ("Neuropixels",) + keywords = ("Neuropixels", "nidq", "NIDQ", "SpikeGLX") associated_suffixes = (".nidq", ".meta", ".bin") info = "Interface for NIDQ board recording data." - ExtractorName = "SpikeGLXRecordingExtractor" - stream_id = "nidq" - @classmethod def get_source_schema(cls) -> dict: source_schema = get_json_schema_from_method_signature(method=cls.__init__, exclude=["x_pitch", "y_pitch"]) source_schema["properties"]["file_path"]["description"] = "Path to SpikeGLX .nidq file." return source_schema - def _source_data_to_extractor_kwargs(self, source_data: dict) -> dict: - - extractor_kwargs = source_data.copy() - extractor_kwargs.pop("file_path") - extractor_kwargs["folder_path"] = self.folder_path - extractor_kwargs["stream_id"] = self.stream_id - return extractor_kwargs - @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) def __init__( self, - file_path: FilePath, + file_path: Optional[FilePath] = None, verbose: bool = True, load_sync_channel: bool = False, es_key: str = "ElectricalSeriesNIDQ", + folder_path: Optional[DirectoryPath] = None, ): """ Read channel data from the NIDQ board for the SpikeGLX recording. @@ -49,30 +47,66 @@ def __init__( Parameters ---------- + folder_path : DirectoryPath + Path to the folder containing the .nidq.bin file. file_path : FilePathType Path to .nidq.bin file. verbose : bool, default: True Whether to output verbose text. - load_sync_channel : bool, default: False - Whether to load the last channel in the stream, which is typically used for synchronization. - If True, then the probe is not loaded. es_key : str, default: "ElectricalSeriesNIDQ" """ - self.file_path = Path(file_path) - self.folder_path = self.file_path.parent + if load_sync_channel: + + warnings.warn( + "The 'load_sync_channel' parameter is deprecated and will be removed in June 2025. " + "The sync channel data is only available the raw files of spikeglx`.", + DeprecationWarning, + stacklevel=2, + ) + + if file_path is None and folder_path is None: + raise ValueError("Either 'file_path' or 'folder_path' must be provided.") + + if file_path is not None: + file_path = Path(file_path) + self.folder_path = file_path.parent + + if folder_path is not None: + self.folder_path = Path(folder_path) + + from spikeinterface.extractors import SpikeGLXRecordingExtractor + + self.recording_extractor = SpikeGLXRecordingExtractor( + folder_path=self.folder_path, + stream_id="nidq", + all_annotations=True, + ) + + channel_ids = self.recording_extractor.get_channel_ids() + analog_channel_signatures = ["XA", "MA"] + self.analog_channel_ids = [ch for ch in channel_ids if "XA" in ch or "MA" in ch] + self.has_analog_channels = len(self.analog_channel_ids) > 0 + self.has_digital_channels = len(self.analog_channel_ids) < len(channel_ids) + if self.has_digital_channels: + import ndx_events # noqa: F401 + from spikeinterface.extractors import SpikeGLXEventExtractor + + self.event_extractor = SpikeGLXEventExtractor(folder_path=self.folder_path) + super().__init__( - file_path=self.file_path, verbose=verbose, load_sync_channel=load_sync_channel, es_key=es_key, + folder_path=self.folder_path, + file_path=file_path, ) - self.source_data.update(file_path=str(file_path)) - self.recording_extractor.set_property( - key="group_name", values=["NIDQChannelGroup"] * self.recording_extractor.get_num_channels() - ) - self.meta = self.recording_extractor.neo_reader.signals_info_dict[(0, "nidq")]["meta"] + self.subset_channels = None + + signal_info_key = (0, "nidq") # Key format is (segment_index, stream_id) + self._signals_info_dict = self.recording_extractor.neo_reader.signals_info_dict[signal_info_key] + self.meta = self._signals_info_dict["meta"] def get_metadata(self) -> dict: metadata = super().get_metadata() @@ -88,24 +122,206 @@ def get_metadata(self) -> dict: manufacturer="National Instruments", ) - # Add groups metadata - metadata["Ecephys"]["Device"] = [device] + metadata["Devices"] = [device] - metadata["Ecephys"]["ElectrodeGroup"][0].update( - name="NIDQChannelGroup", description="A group representing the NIDQ channels.", device=device["name"] - ) - metadata["Ecephys"]["Electrodes"] = [ - dict(name="group_name", description="Name of the ElectrodeGroup this electrode is a part of."), - ] - metadata["Ecephys"]["ElectricalSeriesNIDQ"][ - "description" - ] = "Raw acquisition traces from the NIDQ (.nidq.bin) channels." return metadata def get_channel_names(self) -> list[str]: """Return a list of channel names as set in the recording extractor.""" return list(self.recording_extractor.get_channel_ids()) + def add_to_nwbfile( + self, + nwbfile: NWBFile, + metadata: Optional[dict] = None, + stub_test: bool = False, + starting_time: Optional[float] = None, + write_as: Literal["raw", "lfp", "processed"] = "raw", + write_electrical_series: bool = True, + iterator_type: Optional[str] = "v2", + iterator_opts: Optional[dict] = None, + always_write_timestamps: bool = False, + ): + """ + Add NIDQ board data to an NWB file, including both analog and digital channels if present. + + Parameters + ---------- + nwbfile : NWBFile + The NWB file to which the NIDQ data will be added + metadata : Optional[dict], default: None + Metadata dictionary with device information. If None, uses default metadata + stub_test : bool, default: False + If True, only writes a small amount of data for testing + starting_time : Optional[float], default: None + DEPRECATED: Will be removed in June 2025. Starting time offset for the TimeSeries + write_as : Literal["raw", "lfp", "processed"], default: "raw" + DEPRECATED: Will be removed in June 2025. Specifies how to write the data + write_electrical_series : bool, default: True + DEPRECATED: Will be removed in June 2025. Whether to write electrical series data + iterator_type : Optional[str], default: "v2" + Type of iterator to use for data streaming + iterator_opts : Optional[dict], default: None + Additional options for the iterator + always_write_timestamps : bool, default: False + If True, always writes timestamps instead of using sampling rate + """ + + if starting_time is not None: + warnings.warn( + "The 'starting_time' parameter is deprecated and will be removed in June 2025. " + "Use the time alignment methods for modifying the starting time or timestamps " + "of the data if needed: " + "https://neuroconv.readthedocs.io/en/main/user_guide/temporal_alignment.html", + DeprecationWarning, + stacklevel=2, + ) + + if write_as != "raw": + warnings.warn( + "The 'write_as' parameter is deprecated and will be removed in June 2025. " + "NIDQ should always be written in the acquisition module of NWB. " + "Writing data as LFP or processed data is not supported.", + DeprecationWarning, + stacklevel=2, + ) + + if write_electrical_series is not True: + warnings.warn( + "The 'write_electrical_series' parameter is deprecated and will be removed in June 2025. " + "The option to skip the addition of the data is no longer supported. " + "This option was used in ElectricalSeries to write the electrode and electrode group " + "metadata without the raw data.", + DeprecationWarning, + stacklevel=2, + ) + + if stub_test or self.subset_channels is not None: + recording = self.subset_recording(stub_test=stub_test) + else: + recording = self.recording_extractor + + if metadata is None: + metadata = self.get_metadata() + + # Add devices + device_metadata = metadata.get("Devices", []) + for device in device_metadata: + if device["name"] not in nwbfile.devices: + nwbfile.create_device(**device) + + # Add analog and digital channels + if self.has_analog_channels: + self._add_analog_channels( + nwbfile=nwbfile, + recording=recording, + iterator_type=iterator_type, + iterator_opts=iterator_opts, + always_write_timestamps=always_write_timestamps, + ) + + if self.has_digital_channels: + self._add_digital_channels(nwbfile=nwbfile) + + def _add_analog_channels( + self, + nwbfile: NWBFile, + recording, + iterator_type: Optional[str], + iterator_opts: Optional[dict], + always_write_timestamps: bool, + ): + """ + Add analog channels from the NIDQ board to the NWB file. + + Parameters + ---------- + nwbfile : NWBFile + The NWB file to add the analog channels to + recording : BaseRecording + The recording extractor containing the analog channels + iterator_type : Optional[str] + Type of iterator to use for data streaming + iterator_opts : Optional[dict] + Additional options for the iterator + always_write_timestamps : bool + If True, always writes timestamps instead of using sampling rate + """ + analog_recorder = recording.select_channels(channel_ids=self.analog_channel_ids) + channel_names = analog_recorder.get_property(key="channel_names") + segment_index = 0 + analog_data_iterator = _recording_traces_to_hdmf_iterator( + recording=analog_recorder, + segment_index=segment_index, + iterator_type=iterator_type, + iterator_opts=iterator_opts, + ) + + name = "TimeSeriesNIDQ" + description = f"Analog data from the NIDQ board. Channels are {channel_names} in that order." + time_series_kwargs = dict(name=name, data=analog_data_iterator, unit="a.u.", description=description) + + if always_write_timestamps: + timestamps = recording.get_times(segment_index=segment_index) + shifted_timestamps = timestamps + time_series_kwargs.update(timestamps=shifted_timestamps) + else: + recording_has_timestamps = recording.has_time_vector(segment_index=segment_index) + if recording_has_timestamps: + timestamps = recording.get_times(segment_index=segment_index) + rate = calculate_regular_series_rate(series=timestamps) + recording_t_start = timestamps[0] + else: + rate = recording.get_sampling_frequency() + recording_t_start = recording._recording_segments[segment_index].t_start or 0 + + if rate: + starting_time = float(recording_t_start) + time_series_kwargs.update(starting_time=starting_time, rate=recording.get_sampling_frequency()) + else: + shifted_timestamps = timestamps + time_series_kwargs.update(timestamps=shifted_timestamps) + + time_series = TimeSeries(**time_series_kwargs) + nwbfile.add_acquisition(time_series) + + def _add_digital_channels(self, nwbfile: NWBFile): + """ + Add digital channels from the NIDQ board to the NWB file as events. + + Parameters + ---------- + nwbfile : NWBFile + The NWB file to add the digital channels to + """ + from ndx_events import LabeledEvents + + event_channels = self.event_extractor.channel_ids + for channel_id in event_channels: + events_structure = self.event_extractor.get_events(channel_id=channel_id) + timestamps = events_structure["time"] + labels = events_structure["label"] + + # Some channels have no events + if timestamps.size > 0: + + # Timestamps are not ordered, the ones for off are first and then the ones for on + ordered_indices = np.argsort(timestamps) + ordered_timestamps = timestamps[ordered_indices] + ordered_labels = labels[ordered_indices] + + unique_labels = np.unique(ordered_labels) + label_to_index = {label: index for index, label in enumerate(unique_labels)} + data = [label_to_index[label] for label in ordered_labels] + + channel_name = channel_id.split("#")[-1] + description = f"On and Off Events from channel {channel_name}" + name = f"EventsNIDQDigitalChannel{channel_name}" + labeled_events = LabeledEvents( + name=name, description=description, timestamps=ordered_timestamps, data=data, labels=unique_labels + ) + nwbfile.add_acquisition(labeled_events) + def get_event_times_from_ttl(self, channel_name: str) -> np.ndarray: """ Return the start of event times from the rising part of TTL pulses on one of the NIDQ channels. diff --git a/src/neuroconv/datainterfaces/ophys/baseimagingextractorinterface.py b/src/neuroconv/datainterfaces/ophys/baseimagingextractorinterface.py index 0019b8bd7..04407a3d4 100644 --- a/src/neuroconv/datainterfaces/ophys/baseimagingextractorinterface.py +++ b/src/neuroconv/datainterfaces/ophys/baseimagingextractorinterface.py @@ -47,6 +47,17 @@ def __init__( def get_metadata_schema( self, ) -> dict: + """ + Retrieve the metadata schema for the optical physiology (Ophys) data, with optional handling of photon series type. + + Parameters + ---------- + photon_series_type : {"OnePhotonSeries", "TwoPhotonSeries"}, optional + The type of photon series to include in the schema. If None, the value from the instance is used. + This argument is deprecated and will be removed in a future version. Set `photon_series_type` during + the initialization of the `BaseImagingExtractorInterface` instance. + + """ metadata_schema = super().get_metadata_schema() @@ -93,6 +104,16 @@ def get_metadata_schema( def get_metadata( self, ) -> DeepDict: + """ + Retrieve the metadata for the imaging data, with optional handling of photon series type. + + Parameters + ---------- + photon_series_type : {"OnePhotonSeries", "TwoPhotonSeries"}, optional + The type of photon series to include in the metadata. If None, the value from the instance is used. + This argument is deprecated and will be removed in a future version. Instead, set `photon_series_type` + during the initialization of the `BaseImagingExtractorInterface` instance. + """ from ...tools.roiextractors import get_nwb_imaging_metadata diff --git a/src/neuroconv/datainterfaces/ophys/basesegmentationextractorinterface.py b/src/neuroconv/datainterfaces/ophys/basesegmentationextractorinterface.py index 0f2e41bb9..66d35f57a 100644 --- a/src/neuroconv/datainterfaces/ophys/basesegmentationextractorinterface.py +++ b/src/neuroconv/datainterfaces/ophys/basesegmentationextractorinterface.py @@ -24,6 +24,27 @@ def __init__(self, verbose: bool = False, **source_data): self.segmentation_extractor = self.get_extractor()(**source_data) def get_metadata_schema(self) -> dict: + """ + Generate the metadata schema for Ophys data, updating required fields and properties. + + This method builds upon the base schema and customizes it for Ophys-specific metadata, including required + components such as devices, fluorescence data, imaging planes, and two-photon series. It also applies + temporary schema adjustments to handle certain use cases until a centralized metadata schema definition + is available. + + Returns + ------- + dict + A dictionary representing the updated Ophys metadata schema. + + Notes + ----- + - Ensures that `Device` and `ImageSegmentation` are marked as required. + - Updates various properties, including ensuring arrays for `ImagingPlane` and `TwoPhotonSeries`. + - Adjusts the schema for `Fluorescence`, including required fields and pattern properties. + - Adds schema definitions for `DfOverF`, segmentation images, and summary images. + - Applies temporary fixes, such as setting additional properties for `ImageSegmentation` to True. + """ metadata_schema = super().get_metadata_schema() metadata_schema["required"] = ["Ophys"] metadata_schema["properties"]["Ophys"] = get_base_schema() diff --git a/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffconverter.py b/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffconverter.py index 2a67da720..1fc854b81 100644 --- a/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffconverter.py +++ b/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffconverter.py @@ -31,6 +31,7 @@ def get_source_schema(cls): return source_schema def get_conversion_options_schema(self): + """get the conversion options schema.""" interface_name = list(self.data_interface_objects.keys())[0] return self.data_interface_objects[interface_name].get_conversion_options_schema() @@ -91,6 +92,20 @@ def add_to_nwbfile( stub_test: bool = False, stub_frames: int = 100, ): + """ + Add data from multiple data interfaces to the specified NWBFile. + + Parameters + ---------- + nwbfile : NWBFile + The NWBFile object to which the data will be added. + metadata : dict + Metadata dictionary containing information to describe the data being added to the NWB file. + stub_test : bool, optional + If True, only a subset of the data (up to `stub_frames`) will be added for testing purposes. Default is False. + stub_frames : int, optional + The number of frames to include in the subset if `stub_test` is True. Default is 100. + """ for photon_series_index, (interface_name, data_interface) in enumerate(self.data_interface_objects.items()): data_interface.add_to_nwbfile( nwbfile=nwbfile, @@ -109,6 +124,24 @@ def run_conversion( stub_test: bool = False, stub_frames: int = 100, ) -> None: + """ + Run the conversion process for the instantiated data interfaces and add data to the NWB file. + + Parameters + ---------- + nwbfile_path : FilePath, optional + Path where the NWB file will be written. If None, the file will be handled in-memory. + nwbfile : NWBFile, optional + An in-memory NWBFile object. If None, a new NWBFile object will be created. + metadata : dict, optional + Metadata dictionary for describing the NWB file. If None, it will be auto-generated using the `get_metadata()` method. + overwrite : bool, optional + If True, overwrites the existing NWB file at `nwbfile_path`. If False, appends to the file (default is False). + stub_test : bool, optional + If True, only a subset of the data (up to `stub_frames`) will be added for testing purposes, by default False. + stub_frames : int, optional + The number of frames to include in the subset if `stub_test` is True, by default 100. + """ if metadata is None: metadata = self.get_metadata() @@ -141,6 +174,7 @@ def get_source_schema(cls): return get_json_schema_from_method_signature(cls) def get_conversion_options_schema(self): + """Get the conversion options schema.""" interface_name = list(self.data_interface_objects.keys())[0] return self.data_interface_objects[interface_name].get_conversion_options_schema() @@ -187,6 +221,21 @@ def add_to_nwbfile( stub_test: bool = False, stub_frames: int = 100, ): + """ + Add data from all instantiated data interfaces to the provided NWBFile. + + Parameters + ---------- + nwbfile : NWBFile + The NWBFile object to which the data will be added. + metadata : dict + Metadata dictionary containing information about the data to be added. + stub_test : bool, optional + If True, only a subset of the data (defined by `stub_frames`) will be added for testing purposes, + by default False. + stub_frames : int, optional + The number of frames to include in the subset if `stub_test` is True, by default 100. + """ for photon_series_index, (interface_name, data_interface) in enumerate(self.data_interface_objects.items()): data_interface.add_to_nwbfile( nwbfile=nwbfile, @@ -205,6 +254,24 @@ def run_conversion( stub_test: bool = False, stub_frames: int = 100, ) -> None: + """ + Run the NWB conversion process for all instantiated data interfaces. + + Parameters + ---------- + nwbfile_path : FilePath, optional + The file path where the NWB file will be written. If None, the file is handled in-memory. + nwbfile : NWBFile, optional + An existing in-memory NWBFile object. If None, a new NWBFile object will be created. + metadata : dict, optional + Metadata dictionary used to create or validate the NWBFile. If None, metadata is automatically generated. + overwrite : bool, optional + If True, the NWBFile at `nwbfile_path` is overwritten if it exists. If False (default), data is appended. + stub_test : bool, optional + If True, only a subset of the data (up to `stub_frames`) is used for testing purposes. By default False. + stub_frames : int, optional + The number of frames to include in the subset if `stub_test` is True. By default 100. + """ if metadata is None: metadata = self.get_metadata() diff --git a/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py b/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py index 9742711e1..f7e7bee1b 100644 --- a/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py +++ b/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py @@ -16,6 +16,7 @@ class BrukerTiffMultiPlaneImagingInterface(BaseImagingExtractorInterface): @classmethod def get_source_schema(cls) -> dict: + """Get the source schema for the Bruker TIFF imaging data.""" source_schema = super().get_source_schema() source_schema["properties"]["folder_path"][ "description" @@ -28,6 +29,23 @@ def get_streams( folder_path: DirectoryPath, plane_separation_type: Literal["contiguous", "disjoint"] = None, ) -> dict: + """ + Get streams for the Bruker TIFF imaging data. + + Parameters + ---------- + folder_path : DirectoryPath + Path to the folder containing the Bruker TIFF files. + plane_separation_type : Literal["contiguous", "disjoint"], optional + Type of plane separation to apply. If "contiguous", only the first plane stream for each channel is retained. + + Returns + ------- + dict + A dictionary containing the streams for the Bruker TIFF imaging data. The dictionary has the following keys: + - "channel_streams": List of channel stream names. + - "plane_streams": Dictionary where keys are channel stream names and values are lists of plane streams. + """ from roiextractors import BrukerTiffMultiPlaneImagingExtractor streams = BrukerTiffMultiPlaneImagingExtractor.get_streams(folder_path=folder_path) @@ -117,6 +135,7 @@ def _determine_position_current(self) -> list[float]: return position_values def get_metadata(self) -> DeepDict: + """get metadata for the Bruker TIFF imaging data.""" metadata = super().get_metadata() xml_metadata = self.imaging_extractor.xml_metadata @@ -183,6 +202,7 @@ class BrukerTiffSinglePlaneImagingInterface(BaseImagingExtractorInterface): @classmethod def get_source_schema(cls) -> dict: + """Get the source schema for the Bruker TIFF imaging data.""" source_schema = super().get_source_schema() source_schema["properties"]["folder_path"][ "description" @@ -191,6 +211,19 @@ def get_source_schema(cls) -> dict: @classmethod def get_streams(cls, folder_path: DirectoryPath) -> dict: + """ + Get streams for the Bruker TIFF imaging data. + + Parameters + ---------- + folder_path : DirectoryPath + Path to the folder containing the Bruker TIFF files. + + Returns + ------- + dict + A dictionary containing the streams extracted from the Bruker TIFF files. + """ from roiextractors import BrukerTiffMultiPlaneImagingExtractor streams = BrukerTiffMultiPlaneImagingExtractor.get_streams(folder_path=folder_path) @@ -263,6 +296,7 @@ def _determine_position_current(self) -> list[float]: return position_values def get_metadata(self) -> DeepDict: + """get metadata for the Bruker TIFF imaging data.""" metadata = super().get_metadata() xml_metadata = self.imaging_extractor.xml_metadata diff --git a/src/neuroconv/datainterfaces/ophys/caiman/caimandatainterface.py b/src/neuroconv/datainterfaces/ophys/caiman/caimandatainterface.py index 386c03d3c..802645139 100644 --- a/src/neuroconv/datainterfaces/ophys/caiman/caimandatainterface.py +++ b/src/neuroconv/datainterfaces/ophys/caiman/caimandatainterface.py @@ -12,6 +12,7 @@ class CaimanSegmentationInterface(BaseSegmentationExtractorInterface): @classmethod def get_source_schema(cls) -> dict: + """Get the source schema for the Caiman segmentation interface.""" source_metadata = super().get_source_schema() source_metadata["properties"]["file_path"]["description"] = "Path to .hdf5 file." return source_metadata diff --git a/src/neuroconv/datainterfaces/ophys/micromanagertiff/micromanagertiffdatainterface.py b/src/neuroconv/datainterfaces/ophys/micromanagertiff/micromanagertiffdatainterface.py index 17cbc95ed..5373b7004 100644 --- a/src/neuroconv/datainterfaces/ophys/micromanagertiff/micromanagertiffdatainterface.py +++ b/src/neuroconv/datainterfaces/ophys/micromanagertiff/micromanagertiffdatainterface.py @@ -13,6 +13,7 @@ class MicroManagerTiffImagingInterface(BaseImagingExtractorInterface): @classmethod def get_source_schema(cls) -> dict: + """get the source schema for the Micro-Manager TIFF imaging interface.""" source_schema = super().get_source_schema() source_schema["properties"]["folder_path"]["description"] = "The folder containing the OME-TIF image files." @@ -37,6 +38,7 @@ def __init__(self, folder_path: DirectoryPath, verbose: bool = True): self.imaging_extractor._channel_names = [f"OpticalChannel{channel_name}"] def get_metadata(self) -> dict: + """Get metadata for the Micro-Manager TIFF imaging data.""" metadata = super().get_metadata() micromanager_metadata = self.imaging_extractor.micromanager_metadata diff --git a/src/neuroconv/datainterfaces/ophys/miniscope/miniscopeconverter.py b/src/neuroconv/datainterfaces/ophys/miniscope/miniscopeconverter.py index d1a0fb701..59424d7a5 100644 --- a/src/neuroconv/datainterfaces/ophys/miniscope/miniscopeconverter.py +++ b/src/neuroconv/datainterfaces/ophys/miniscope/miniscopeconverter.py @@ -61,6 +61,7 @@ def __init__(self, folder_path: DirectoryPath, verbose: bool = True): ) def get_conversion_options_schema(self) -> dict: + """get the conversion options schema.""" return self.data_interface_objects["MiniscopeImaging"].get_conversion_options_schema() def add_to_nwbfile( @@ -70,6 +71,21 @@ def add_to_nwbfile( stub_test: bool = False, stub_frames: int = 100, ): + """ + Add Miniscope imaging and behavioral camera data to the specified NWBFile. + + Parameters + ---------- + nwbfile : NWBFile + The NWBFile object to which the imaging and behavioral data will be added. + metadata : dict + Metadata dictionary containing information about the imaging and behavioral recordings. + stub_test : bool, optional + If True, only a subset of the data (defined by `stub_frames`) will be added for testing purposes, + by default False. + stub_frames : int, optional + The number of frames to include in the subset if `stub_test` is True, by default 100. + """ self.data_interface_objects["MiniscopeImaging"].add_to_nwbfile( nwbfile=nwbfile, metadata=metadata, @@ -90,6 +106,25 @@ def run_conversion( stub_test: bool = False, stub_frames: int = 100, ) -> None: + """ + Run the NWB conversion process for the instantiated data interfaces. + + Parameters + ---------- + nwbfile_path : str, optional + Path where the NWBFile will be written. If None, the file is handled in-memory. + nwbfile : NWBFile, optional + An in-memory NWBFile object to be written to the file. If None, a new NWBFile is created. + metadata : dict, optional + Metadata dictionary with information to create the NWBFile. If None, metadata is auto-generated. + overwrite : bool, optional + If True, overwrites the existing NWBFile at `nwbfile_path`. If False (default), data is appended. + stub_test : bool, optional + If True, only a subset of the data (up to `stub_frames`) is written for testing purposes, + by default False. + stub_frames : int, optional + The number of frames to include in the subset if `stub_test` is True, by default 100. + """ if metadata is None: metadata = self.get_metadata() diff --git a/src/neuroconv/datainterfaces/ophys/miniscope/miniscopeimagingdatainterface.py b/src/neuroconv/datainterfaces/ophys/miniscope/miniscopeimagingdatainterface.py index 64a180c46..5a1f6d521 100644 --- a/src/neuroconv/datainterfaces/ophys/miniscope/miniscopeimagingdatainterface.py +++ b/src/neuroconv/datainterfaces/ophys/miniscope/miniscopeimagingdatainterface.py @@ -19,6 +19,7 @@ class MiniscopeImagingInterface(BaseImagingExtractorInterface): @classmethod def get_source_schema(cls) -> dict: + """Get the source schema for the Miniscope imaging interface.""" source_schema = super().get_source_schema() source_schema["properties"]["folder_path"][ "description" @@ -49,6 +50,7 @@ def __init__(self, folder_path: DirectoryPath): self.photon_series_type = "OnePhotonSeries" def get_metadata(self) -> DeepDict: + """Get metadata for the Miniscope imaging data.""" from ....tools.roiextractors import get_nwb_imaging_metadata metadata = super().get_metadata() @@ -74,6 +76,7 @@ def get_metadata(self) -> DeepDict: return metadata def get_metadata_schema(self) -> dict: + """Get the metadata schema for the Miniscope imaging data.""" metadata_schema = super().get_metadata_schema() metadata_schema["properties"]["Ophys"]["definitions"]["Device"]["additionalProperties"] = True return metadata_schema @@ -92,6 +95,23 @@ def add_to_nwbfile( stub_test: bool = False, stub_frames: int = 100, ): + """ + Add imaging data to the specified NWBFile, including device and photon series information. + + Parameters + ---------- + nwbfile : NWBFile + The NWBFile object to which the imaging data will be added. + metadata : dict, optional + Metadata containing information about the imaging device and photon series. If None, default metadata is used. + photon_series_type : {"TwoPhotonSeries", "OnePhotonSeries"}, optional + The type of photon series to be added, either "TwoPhotonSeries" or "OnePhotonSeries", by default "OnePhotonSeries". + stub_test : bool, optional + If True, only a subset of the data (defined by `stub_frames`) will be added for testing purposes, + by default False. + stub_frames : int, optional + The number of frames to include if `stub_test` is True, by default 100. + """ from ndx_miniscope.utils import add_miniscope_device from ....tools.roiextractors import add_photon_series_to_nwbfile diff --git a/src/neuroconv/datainterfaces/ophys/sbx/sbxdatainterface.py b/src/neuroconv/datainterfaces/ophys/sbx/sbxdatainterface.py index 554cc5aba..49e556d06 100644 --- a/src/neuroconv/datainterfaces/ophys/sbx/sbxdatainterface.py +++ b/src/neuroconv/datainterfaces/ophys/sbx/sbxdatainterface.py @@ -37,6 +37,7 @@ def __init__( ) def get_metadata(self) -> dict: + """Get metadata for the Scanbox imaging data.""" metadata = super().get_metadata() metadata["Ophys"]["Device"][0]["description"] = "Scanbox imaging" return metadata diff --git a/src/neuroconv/datainterfaces/ophys/scanimage/scanimageimaginginterfaces.py b/src/neuroconv/datainterfaces/ophys/scanimage/scanimageimaginginterfaces.py index c74161e55..7d9d7003b 100644 --- a/src/neuroconv/datainterfaces/ophys/scanimage/scanimageimaginginterfaces.py +++ b/src/neuroconv/datainterfaces/ophys/scanimage/scanimageimaginginterfaces.py @@ -28,6 +28,7 @@ class ScanImageImagingInterface(BaseImagingExtractorInterface): @classmethod def get_source_schema(cls) -> dict: + """Get the source schema for the ScanImage imaging interface.""" source_schema = super().get_source_schema() source_schema["properties"]["file_path"]["description"] = "Path to Tiff file." return source_schema @@ -139,6 +140,7 @@ def __init__( super().__init__(file_path=file_path, fallback_sampling_frequency=fallback_sampling_frequency, verbose=verbose) def get_metadata(self) -> dict: + """get metadata for the ScanImage imaging data""" device_number = 0 # Imaging plane metadata is a list with metadata for each plane metadata = super().get_metadata() @@ -174,6 +176,7 @@ class ScanImageMultiFileImagingInterface(BaseImagingExtractorInterface): @classmethod def get_source_schema(cls) -> dict: + """get the source schema for the ScanImage multi-file imaging interface.""" source_schema = super().get_source_schema() source_schema["properties"]["folder_path"]["description"] = "Path to the folder containing the TIFF files." return source_schema @@ -304,6 +307,7 @@ def __init__( ) def get_metadata(self) -> dict: + """get metadata for the ScanImage imaging data""" metadata = super().get_metadata() extracted_session_start_time = datetime.datetime.strptime( @@ -421,6 +425,7 @@ def __init__( ) def get_metadata(self) -> dict: + """get metadata for the ScanImage imaging data""" metadata = super().get_metadata() extracted_session_start_time = datetime.datetime.strptime( @@ -548,6 +553,7 @@ def __init__( ) def get_metadata(self) -> dict: + """get metadata for the ScanImage imaging data""" metadata = super().get_metadata() extracted_session_start_time = datetime.datetime.strptime( @@ -677,6 +683,7 @@ def __init__( ) def get_metadata(self) -> dict: + """get metadata for the ScanImage imaging data""" metadata = super().get_metadata() extracted_session_start_time = datetime.datetime.strptime( diff --git a/src/neuroconv/datainterfaces/ophys/suite2p/suite2pdatainterface.py b/src/neuroconv/datainterfaces/ophys/suite2p/suite2pdatainterface.py index 056616ce5..8a3f876c2 100644 --- a/src/neuroconv/datainterfaces/ophys/suite2p/suite2pdatainterface.py +++ b/src/neuroconv/datainterfaces/ophys/suite2p/suite2pdatainterface.py @@ -50,6 +50,7 @@ class Suite2pSegmentationInterface(BaseSegmentationExtractorInterface): @classmethod def get_source_schema(cls) -> dict: + """Get the source schema for the Suite2p segmentation interface.""" schema = super().get_source_schema() schema["properties"]["folder_path"][ "description" @@ -113,6 +114,7 @@ def __init__( self.verbose = verbose def get_metadata(self) -> DeepDict: + """get metadata for the Suite2p segmentation data""" metadata = super().get_metadata() # No need to update the metadata links for the default plane segmentation name @@ -140,6 +142,40 @@ def add_to_nwbfile( iterator_options: Optional[dict] = None, compression_options: Optional[dict] = None, ): + """ + Add segmentation data to the specified NWBFile. + + Parameters + ---------- + nwbfile : NWBFile + The NWBFile object to which the segmentation data will be added. + metadata : dict, optional + Metadata containing information about the segmentation. If None, default metadata is used. + stub_test : bool, optional + If True, only a subset of the data (defined by `stub_frames`) will be added for testing purposes, + by default False. + stub_frames : int, optional + The number of frames to include in the subset if `stub_test` is True, by default 100. + include_roi_centroids : bool, optional + Whether to include the centroids of regions of interest (ROIs) in the data, by default True. + include_roi_acceptance : bool, optional + Whether to include acceptance status of ROIs, by default True. + mask_type : str, default: 'image' + There are three types of ROI masks in NWB, 'image', 'pixel', and 'voxel'. + + * 'image' masks have the same shape as the reference images the segmentation was applied to, and weight each pixel + by its contribution to the ROI (typically boolean, with 0 meaning 'not in the ROI'). + * 'pixel' masks are instead indexed by ROI, with the data at each index being the shape of the image by the number + of pixels in each ROI. + * 'voxel' masks are instead indexed by ROI, with the data at each index being the shape of the volume by the number + of voxels in each ROI. + + Specify your choice between these two as mask_type='image', 'pixel', 'voxel', or None. + plane_segmentation_name : str, optional + The name of the plane segmentation object, by default None. + iterator_options : dict, optional + Additional options for iterating over the data, by default None. + """ super().add_to_nwbfile( nwbfile=nwbfile, metadata=metadata, diff --git a/src/neuroconv/datainterfaces/ophys/tdt_fp/tdtfiberphotometrydatainterface.py b/src/neuroconv/datainterfaces/ophys/tdt_fp/tdtfiberphotometrydatainterface.py index 8b092464e..0c6b90aea 100644 --- a/src/neuroconv/datainterfaces/ophys/tdt_fp/tdtfiberphotometrydatainterface.py +++ b/src/neuroconv/datainterfaces/ophys/tdt_fp/tdtfiberphotometrydatainterface.py @@ -47,6 +47,7 @@ def __init__(self, folder_path: DirectoryPath, verbose: bool = True): import ndx_fiber_photometry # noqa: F401 def get_metadata(self) -> DeepDict: + """Get metadata for the TDTFiberPhotometryInterface.""" metadata = super().get_metadata() tdt_photometry = self.load(evtype=["scalars"]) # This evtype quickly loads info without loading all the data. start_timestamp = tdt_photometry.info.start_date.timestamp() @@ -55,6 +56,7 @@ def get_metadata(self) -> DeepDict: return metadata def get_metadata_schema(self) -> dict: + """Get the metadata schema for the TDTFiberPhotometryInterface.""" metadata_schema = super().get_metadata_schema() return metadata_schema diff --git a/src/neuroconv/datainterfaces/ophys/tiff/tiffdatainterface.py b/src/neuroconv/datainterfaces/ophys/tiff/tiffdatainterface.py index 1eaa3b55e..ce98561de 100644 --- a/src/neuroconv/datainterfaces/ophys/tiff/tiffdatainterface.py +++ b/src/neuroconv/datainterfaces/ophys/tiff/tiffdatainterface.py @@ -14,6 +14,7 @@ class TiffImagingInterface(BaseImagingExtractorInterface): @classmethod def get_source_schema(cls) -> dict: + """ "Get the source schema for the TIFF imaging interface.""" source_schema = super().get_source_schema() source_schema["properties"]["file_path"]["description"] = "Path to Tiff file." return source_schema diff --git a/src/neuroconv/datainterfaces/text/timeintervalsinterface.py b/src/neuroconv/datainterfaces/text/timeintervalsinterface.py index 5f5b1107d..a1de63a07 100644 --- a/src/neuroconv/datainterfaces/text/timeintervalsinterface.py +++ b/src/neuroconv/datainterfaces/text/timeintervalsinterface.py @@ -24,16 +24,20 @@ def __init__( verbose: bool = True, ): """ + Initialize the TimeIntervalsInterface. + Parameters ---------- file_path : FilePath + The path to the file containing time intervals data. read_kwargs : dict, optional - verbose : bool, default: True + Additional arguments for reading the file, by default None. + verbose : bool, optional + If True, provides verbose output, by default True. """ read_kwargs = read_kwargs or dict() super().__init__(file_path=file_path) self.verbose = verbose - self._read_kwargs = read_kwargs self.dataframe = self._read_file(file_path, **read_kwargs) self.time_intervals = None @@ -50,22 +54,74 @@ def get_metadata(self) -> dict: return metadata def get_metadata_schema(self) -> dict: + """ + Get the metadata schema for the time intervals. + + Returns + ------- + dict + The schema dictionary for time intervals metadata. + """ fpath = Path(__file__).parent.parent.parent / "schemas" / "timeintervals_schema.json" return load_dict_from_file(fpath) def get_original_timestamps(self, column: str) -> np.ndarray: + """ + Get the original timestamps for a given column. + + Parameters + ---------- + column : str + The name of the column containing timestamps. + + Returns + ------- + np.ndarray + The original timestamps from the specified column. + + Raises + ------ + ValueError + If the column name does not end with '_time'. + """ if not column.endswith("_time"): raise ValueError("Timing columns on a TimeIntervals table need to end with '_time'!") return self._read_file(**self.source_data, **self._read_kwargs)[column].values def get_timestamps(self, column: str) -> np.ndarray: + """ + Get the current timestamps for a given column. + + Parameters + ---------- + column : str + The name of the column containing timestamps. + + Returns + ------- + np.ndarray + The current timestamps from the specified column. + + Raises + ------ + ValueError + If the column name does not end with '_time'. + """ if not column.endswith("_time"): raise ValueError("Timing columns on a TimeIntervals table need to end with '_time'!") return self.dataframe[column].values def set_aligned_starting_time(self, aligned_starting_time: float): + """ + Align the starting time by shifting all timestamps by the given value. + + Parameters + ---------- + aligned_starting_time : float + The aligned starting time to shift all timestamps by. + """ timing_columns = [column for column in self.dataframe.columns if column.endswith("_time")] for column in timing_columns: @@ -74,6 +130,23 @@ def set_aligned_starting_time(self, aligned_starting_time: float): def set_aligned_timestamps( self, aligned_timestamps: np.ndarray, column: str, interpolate_other_columns: bool = False ): + """ + Set aligned timestamps for the given column and optionally interpolate other columns. + + Parameters + ---------- + aligned_timestamps : np.ndarray + The aligned timestamps to set for the given column. + column : str + The name of the column to update with the aligned timestamps. + interpolate_other_columns : bool, optional + If True, interpolate the timestamps in other columns, by default False. + + Raises + ------ + ValueError + If the column name does not end with '_time'. + """ if not column.endswith("_time"): raise ValueError("Timing columns on a TimeIntervals table need to end with '_time'!") @@ -96,6 +169,18 @@ def set_aligned_timestamps( ) def align_by_interpolation(self, unaligned_timestamps: np.ndarray, aligned_timestamps: np.ndarray, column: str): + """ + Align timestamps using linear interpolation. + + Parameters + ---------- + unaligned_timestamps : np.ndarray + The original unaligned timestamps that map to the aligned timestamps. + aligned_timestamps : np.ndarray + The target aligned timestamps corresponding to the unaligned timestamps. + column : str + The name of the column containing the timestamps to be aligned. + """ current_timestamps = self.get_timestamps(column=column) assert ( current_timestamps[1] >= unaligned_timestamps[0] diff --git a/src/neuroconv/nwbconverter.py b/src/neuroconv/nwbconverter.py index fe1b09915..2d70cf8ee 100644 --- a/src/neuroconv/nwbconverter.py +++ b/src/neuroconv/nwbconverter.py @@ -177,7 +177,21 @@ def create_nwbfile(self, metadata: Optional[dict] = None, conversion_options: Op self.add_to_nwbfile(nwbfile=nwbfile, metadata=metadata, conversion_options=conversion_options) return nwbfile - def add_to_nwbfile(self, nwbfile: NWBFile, metadata, conversion_options: Optional[dict] = None) -> None: + def add_to_nwbfile(self, nwbfile: NWBFile, metadata, conversion_options: Optional[dict] = None): + """ + Add data from the instantiated data interfaces to the given NWBFile. + + Parameters + ---------- + nwbfile : NWBFile + The NWB file object to which the data from the data interfaces will be added. + metadata : dict + The metadata dictionary that contains information used to describe the data. + conversion_options : dict, optional + A dictionary containing conversion options for each interface, where non-default behavior is requested. + Each key corresponds to a data interface name, and the values are dictionaries with options for that interface. + By default, None. + """ conversion_options = conversion_options or dict() for interface_name, data_interface in self.data_interface_objects.items(): data_interface.add_to_nwbfile( diff --git a/src/neuroconv/schemas/yaml_conversion_specification_schema.json b/src/neuroconv/schemas/yaml_conversion_specification_schema.json index c6526803b..039a1cf48 100644 --- a/src/neuroconv/schemas/yaml_conversion_specification_schema.json +++ b/src/neuroconv/schemas/yaml_conversion_specification_schema.json @@ -8,6 +8,7 @@ "required": ["experiments"], "additionalProperties": false, "properties": { + "upload_to_dandiset": {"type": "string"}, "metadata": {"$ref": "./metadata_schema.json#"}, "conversion_options": {"type": "object"}, "data_interfaces": { diff --git a/src/neuroconv/tools/aws/__init__.py b/src/neuroconv/tools/aws/__init__.py index 88144fb01..70a42cbf5 100644 --- a/src/neuroconv/tools/aws/__init__.py +++ b/src/neuroconv/tools/aws/__init__.py @@ -1,4 +1,9 @@ from ._submit_aws_batch_job import submit_aws_batch_job from ._rclone_transfer_batch_job import rclone_transfer_batch_job +from ._deploy_neuroconv_batch_job import deploy_neuroconv_batch_job -__all__ = ["submit_aws_batch_job", "rclone_transfer_batch_job"] +__all__ = [ + "submit_aws_batch_job", + "rclone_transfer_batch_job", + "deploy_neuroconv_batch_job", +] diff --git a/src/neuroconv/tools/aws/_deploy_neuroconv_batch_job.py b/src/neuroconv/tools/aws/_deploy_neuroconv_batch_job.py new file mode 100644 index 000000000..1df86d957 --- /dev/null +++ b/src/neuroconv/tools/aws/_deploy_neuroconv_batch_job.py @@ -0,0 +1,241 @@ +"""Collection of helper functions for deploying NeuroConv in EC2 Batch jobs on AWS.""" + +import os +import time +import uuid +import warnings +from typing import Optional + +import boto3 +from pydantic import FilePath, validate_call + +from ._rclone_transfer_batch_job import rclone_transfer_batch_job +from ._submit_aws_batch_job import submit_aws_batch_job + +_RETRY_STATES = ["RUNNABLE", "PENDING", "STARTING", "RUNNING"] + + +@validate_call +def deploy_neuroconv_batch_job( + *, + rclone_command: str, + yaml_specification_file_path: FilePath, + job_name: str, + efs_volume_name: str, + rclone_config_file_path: Optional[FilePath] = None, + status_tracker_table_name: str = "neuroconv_batch_status_tracker", + compute_environment_name: str = "neuroconv_batch_environment", + job_queue_name: str = "neuroconv_batch_queue", + job_definition_name: Optional[str] = None, + minimum_worker_ram_in_gib: int = 16, # Higher than previous recommendations for safer buffering room + minimum_worker_cpus: int = 4, + region: Optional[str] = None, +) -> dict[str, str]: + """ + Submit a job to AWS Batch for processing. + + Requires AWS credentials saved to files in the `~/.aws/` folder or set as environment variables. + + Parameters + ---------- + rclone_command : str + The command to pass directly to Rclone running on the EC2 instance. + E.g.: "rclone copy my_drive:testing_rclone /mnt/efs/source" + Must move data from or to '/mnt/efs/source'. + yaml_specification_file_path : FilePath + The path to the YAML file containing the NeuroConv specification. + job_name : str + The name of the job to submit. + efs_volume_name : str + The name of an EFS volume to be created and attached to the job. + The path exposed to the container will always be `/mnt/efs`. + rclone_config_file_path : FilePath, optional + The path to the Rclone configuration file to use for the job. + If unspecified, method will attempt to find the file in `~/.rclone` and will raise an error if it cannot. + status_tracker_table_name : str, default: "neuroconv_batch_status_tracker" + The name of the DynamoDB table to use for tracking job status. + compute_environment_name : str, default: "neuroconv_batch_environment" + The name of the compute environment to use for the job. + job_queue_name : str, default: "neuroconv_batch_queue" + The name of the job queue to use for the job. + job_definition_name : str, optional + The name of the job definition to use for the job. + If unspecified, a name starting with 'neuroconv_batch_' will be generated. + minimum_worker_ram_in_gib : int, default: 4 + The minimum amount of base worker memory required to run this job. + Determines the EC2 instance type selected by the automatic 'best fit' selector. + Recommended to be several GiB to allow comfortable buffer space for data chunk iterators. + minimum_worker_cpus : int, default: 4 + The minimum number of CPUs required to run this job. + A minimum of 4 is required, even if only one will be used in the actual process. + region : str, optional + The AWS region to use for the job. + If not provided, we will attempt to load the region from your local AWS configuration. + If that file is not found on your system, we will default to "us-east-2", the location of the DANDI Archive. + + Returns + ------- + info : dict + A dictionary containing information about this AWS Batch job. + + info["rclone_job_submission_info"] is the return value of `neuroconv.tools.aws.rclone_transfer_batch_job`. + info["neuroconv_job_submission_info"] is the return value of `neuroconv.tools.aws.submit_job`. + """ + efs_volume_name = efs_volume_name or f"neuroconv_batch_efs_volume_{uuid.uuid4().hex[:4]}" + region = region or "us-east-2" + + if "/mnt/efs/source" not in rclone_command: + message = ( + f"The Rclone command '{rclone_command}' does not contain a reference to '/mnt/efs/source'. " + "Without utilizing the EFS mount, the instance is unlikely to have enough local disk space. " + "The subfolder 'source' is also required to eliminate ambiguity in the transfer process." + ) + raise ValueError(message) + + rclone_job_name = f"{job_name}_rclone_transfer" + rclone_job_submission_info = rclone_transfer_batch_job( + rclone_command=rclone_command, + job_name=rclone_job_name, + efs_volume_name=efs_volume_name, + rclone_config_file_path=rclone_config_file_path, + region=region, + ) + rclone_job_id = rclone_job_submission_info["job_submission_info"]["jobId"] + + # Give the EFS and other aspects time to spin up before submitting next dependent job + # (Otherwise, good chance that duplicate EFS will be created) + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) + + batch_client = boto3.client( + service_name="batch", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + efs_client = boto3.client( + service_name="efs", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + + available_efs_volumes = efs_client.describe_file_systems() + matching_efs_volumes = [ + file_system + for file_system in available_efs_volumes["FileSystems"] + for tag in file_system["Tags"] + if tag["Key"] == "Name" and tag["Value"] == efs_volume_name + ] + max_iterations = 10 + iteration = 0 + while len(matching_efs_volumes) == 0 and iteration < max_iterations: + iteration += 1 + time.sleep(30) + + matching_efs_volumes = [ + file_system + for file_system in available_efs_volumes["FileSystems"] + for tag in file_system["Tags"] + if tag["Key"] == "Name" and tag["Value"] == efs_volume_name + ] + + if len(matching_efs_volumes) == 0: + message = f"Unable to create EFS volume '{efs_volume_name}' after {max_iterations} attempts!" + raise ValueError(message) + + docker_image = "ghcr.io/catalystneuro/neuroconv_yaml_variable:latest" + + with open(file=yaml_specification_file_path, mode="r") as io: + yaml_specification_file_stream = io.read() + + neuroconv_job_name = f"{job_name}_neuroconv_deployment" + job_dependencies = [{"jobId": rclone_job_id, "type": "SEQUENTIAL"}] + neuroconv_job_submission_info = submit_aws_batch_job( + job_name=neuroconv_job_name, + docker_image=docker_image, + environment_variables={ + "NEUROCONV_YAML": yaml_specification_file_stream, + "NEUROCONV_DATA_PATH": "/mnt/efs/source", + # TODO: would prefer this to use subfolders for source and output, but need logic for YAML + # related code to create them if missing (hard to send EFS this command directly) + # (the code was included in this PR, but a release cycle needs to complete for the docker images before + # it can be used here) + # "NEUROCONV_OUTPUT_PATH": "/mnt/efs/output", + "NEUROCONV_OUTPUT_PATH": "/mnt/efs", + }, + efs_volume_name=efs_volume_name, + job_dependencies=job_dependencies, + status_tracker_table_name=status_tracker_table_name, + compute_environment_name=compute_environment_name, + job_queue_name=job_queue_name, + job_definition_name=job_definition_name, + minimum_worker_ram_in_gib=minimum_worker_ram_in_gib, + minimum_worker_cpus=minimum_worker_cpus, + region=region, + ) + + info = { + "rclone_job_submission_info": rclone_job_submission_info, + "neuroconv_job_submission_info": neuroconv_job_submission_info, + } + + # TODO: would be better to spin up third dependent job to clean up EFS volume after neuroconv job completes + neuroconv_job_id = neuroconv_job_submission_info["job_submission_info"]["jobId"] + job = None + max_retries = 60 * 12 # roughly 12 hours max runtime (aside from internet loss) for checking cleanup + sleep_time = 60 # 1 minute + retry = 0.0 + time.sleep(sleep_time) + while retry < max_retries: + job_description_response = batch_client.describe_jobs(jobs=[neuroconv_job_id]) + if job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200: + # sleep but only increment retry by a small amount + # (really should only apply if internet connection is temporarily lost) + retry += 0.1 + time.sleep(sleep_time) + + job = job_description_response["jobs"][0] + if job["status"] in _RETRY_STATES: + retry += 1.0 + time.sleep(sleep_time) + elif job["status"] == "SUCCEEDED": + break + + if retry >= max_retries: + message = ( + "Maximum retries reached for checking job completion for automatic EFS cleanup! " + "Please delete the EFS volume manually." + ) + warnings.warn(message=message, stacklevel=2) + + return info + + # Cleanup EFS after job is complete - must clear mount targets first, then wait before deleting the volume + efs_volumes = efs_client.describe_file_systems() + matching_efs_volumes = [ + file_system + for file_system in efs_volumes["FileSystems"] + for tag in file_system["Tags"] + if tag["Key"] == "Name" and tag["Value"] == efs_volume_name + ] + if len(matching_efs_volumes) != 1: + message = ( + f"Expected to find exactly one EFS volume with name '{efs_volume_name}', " + f"but found {len(matching_efs_volumes)}\n\n{matching_efs_volumes=}\n\n!" + "You will have to delete these manually." + ) + warnings.warn(message=message, stacklevel=2) + + return info + + efs_volume = matching_efs_volumes[0] + efs_id = efs_volume["FileSystemId"] + mount_targets = efs_client.describe_mount_targets(FileSystemId=efs_id) + for mount_target in mount_targets["MountTargets"]: + efs_client.delete_mount_target(MountTargetId=mount_target["MountTargetId"]) + + time.sleep(sleep_time) + efs_client.delete_file_system(FileSystemId=efs_id) + + return info diff --git a/src/neuroconv/tools/aws/_submit_aws_batch_job.py b/src/neuroconv/tools/aws/_submit_aws_batch_job.py index 748f25399..cae25f3ce 100644 --- a/src/neuroconv/tools/aws/_submit_aws_batch_job.py +++ b/src/neuroconv/tools/aws/_submit_aws_batch_job.py @@ -464,11 +464,14 @@ def _create_or_get_efs_id( if tag["Key"] == "Name" and tag["Value"] == efs_volume_name ] - if len(matching_efs_volumes) > 1: + if len(matching_efs_volumes) == 1: efs_volume = matching_efs_volumes[0] efs_id = efs_volume["FileSystemId"] return efs_id + elif len(matching_efs_volumes) > 1: + message = f"Multiple EFS volumes with the name '{efs_volume_name}' were found!\n\n{matching_efs_volumes=}\n" + raise ValueError(message) # Existing volume not found - must create a fresh one and set mount targets on it efs_volume = efs_client.create_file_system( @@ -506,7 +509,7 @@ def _create_or_get_efs_id( return efs_id -def _generate_job_definition_name( +def generate_job_definition_name( *, docker_image: str, minimum_worker_ram_in_gib: int, @@ -515,9 +518,7 @@ def _generate_job_definition_name( ) -> str: # pragma: no cover """ Generate a job definition name for the AWS Batch job. - Note that Docker images don't strictly require a tag to be pulled or used - 'latest' is always used by default. - Parameters ---------- docker_image : str @@ -529,15 +530,13 @@ def _generate_job_definition_name( minimum_worker_cpus : int The minimum number of CPUs required to run this job. A minimum of 4 is required, even if only one will be used in the actual process. + efs_id : Optional[str] + The ID of the EFS filesystem to mount, if any. """ - docker_tags = docker_image.split(":")[1:] - docker_tag = docker_tags[0] if len(docker_tags) > 1 else None - # AWS Batch does not allow colons, slashes, or periods in job definition names parsed_docker_image_name = str(docker_image) - for disallowed_character in [":", r"/", "."]: + for disallowed_character in [":", "/", r"/", "."]: parsed_docker_image_name = parsed_docker_image_name.replace(disallowed_character, "-") - job_definition_name = f"neuroconv_batch" job_definition_name += f"_{parsed_docker_image_name}-image" job_definition_name += f"_{minimum_worker_ram_in_gib}-GiB-RAM" @@ -546,7 +545,6 @@ def _generate_job_definition_name( job_definition_name += f"_{efs_id}" if docker_tag is None or docker_tag == "latest": date = datetime.now().strftime("%Y-%m-%d") - return job_definition_name @@ -644,7 +642,7 @@ def _ensure_job_definition_exists_and_get_arn( }, }, ] - mountPoints = [{"containerPath": "/mnt/efs/", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}] + mountPoints = [{"containerPath": "/mnt/efs", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}] # batch_client.register_job_definition is not synchronous and so we need to wait a bit afterwards batch_client.register_job_definition( diff --git a/src/neuroconv/tools/hdmf.py b/src/neuroconv/tools/hdmf.py index 660971df5..f32ea23a0 100644 --- a/src/neuroconv/tools/hdmf.py +++ b/src/neuroconv/tools/hdmf.py @@ -50,6 +50,7 @@ def estimate_default_chunk_shape(chunk_mb: float, maxshape: tuple[int, ...], dty def estimate_default_buffer_shape( buffer_gb: float, chunk_shape: tuple[int, ...], maxshape: tuple[int, ...], dtype: np.dtype ) -> tuple[int, ...]: + # TODO: Ad ddocstring to this once someone understands it better # Elevate any overflow warnings to trigger error. # This is usually an indicator of something going terribly wrong with the estimation calculations and should be # avoided at all costs. diff --git a/src/neuroconv/tools/nwb_helpers/_metadata_and_file_helpers.py b/src/neuroconv/tools/nwb_helpers/_metadata_and_file_helpers.py index c3aaea48d..355c86510 100644 --- a/src/neuroconv/tools/nwb_helpers/_metadata_and_file_helpers.py +++ b/src/neuroconv/tools/nwb_helpers/_metadata_and_file_helpers.py @@ -8,7 +8,6 @@ from datetime import datetime from pathlib import Path from typing import Literal, Optional -from warnings import warn from hdmf_zarr import NWBZarrIO from pydantic import FilePath @@ -26,7 +25,7 @@ def get_module(nwbfile: NWBFile, name: str, description: str = None): """Check if processing module exists. If not, create it. Then return module.""" if name in nwbfile.processing: if description is not None and nwbfile.processing[name].description != description: - warn( + warnings.warn( "Custom description given to get_module does not match existing module description! " "Ignoring custom description." ) @@ -157,7 +156,7 @@ def _attempt_cleanup_of_existing_nwbfile(nwbfile_path: Path) -> None: # Windows in particular can encounter errors at this step except PermissionError: # pragma: no cover message = f"Unable to remove NWB file located at {nwbfile_path.absolute()}! Please remove it manually." - warn(message=message, stacklevel=2) + warnings.warn(message=message, stacklevel=2) @contextmanager diff --git a/src/neuroconv/tools/testing/data_interface_mixins.py b/src/neuroconv/tools/testing/data_interface_mixins.py index fab049165..dc45cec53 100644 --- a/src/neuroconv/tools/testing/data_interface_mixins.py +++ b/src/neuroconv/tools/testing/data_interface_mixins.py @@ -33,7 +33,7 @@ configure_backend, get_default_backend_configuration, ) -from neuroconv.utils import _NWBMetaDataEncoder +from neuroconv.utils.json_schema import _NWBMetaDataEncoder class DataInterfaceTestMixin: diff --git a/src/neuroconv/tools/testing/mock_interfaces.py b/src/neuroconv/tools/testing/mock_interfaces.py index 0652284e7..0350806bb 100644 --- a/src/neuroconv/tools/testing/mock_interfaces.py +++ b/src/neuroconv/tools/testing/mock_interfaces.py @@ -57,31 +57,70 @@ def get_source_schema(cls) -> dict: def __init__(self, event_times: Optional[ArrayType] = None): """ - Define event times for some behavior. + Initialize the interface with event times for behavior. Parameters ---------- event_times : list of floats, optional The event times to set as timestamps for this interface. - The default is the array [1.2, 2.3, 3.4] for similarity to the timescale of the MockSpikeGLXNIDQInterface. + The default is the array [1.2, 2.3, 3.4] to simulate a time series similar to the + MockSpikeGLXNIDQInterface. """ event_times = event_times or [1.2, 2.3, 3.4] self.event_times = np.array(event_times) self.original_event_times = np.array(event_times) # Make a copy of the initial loaded timestamps def get_original_timestamps(self) -> np.ndarray: + """ + Get the original event times before any alignment or transformation. + + Returns + ------- + np.ndarray + The original event times as a NumPy array. + """ return self.original_event_times def get_timestamps(self) -> np.ndarray: + """ + Get the current (possibly aligned) event times. + + Returns + ------- + np.ndarray + The current event times as a NumPy array, possibly modified after alignment. + """ return self.event_times def set_aligned_timestamps(self, aligned_timestamps: np.ndarray): + """ + Set the event times after alignment. + + Parameters + ---------- + aligned_timestamps : np.ndarray + The aligned event timestamps to update the internal event times. + """ self.event_times = aligned_timestamps def add_to_nwbfile(self, nwbfile: NWBFile, metadata: dict): + """ + Add the event times to an NWBFile as a DynamicTable. + + Parameters + ---------- + nwbfile : NWBFile + The NWB file to which the event times will be added. + metadata : dict + Metadata to describe the event times in the NWB file. + + Notes + ----- + This method creates a DynamicTable to store event times and adds it to the NWBFile's acquisition. + """ table = DynamicTable(name="BehaviorEvents", description="Times of various classified behaviors.") table.add_column(name="event_time", description="Time of each event.") - for timestamp in self.get_timestamps(): # adding data by column gives error + for timestamp in self.get_timestamps(): table.add_row(event_time=timestamp) nwbfile.add_acquisition(table) @@ -119,6 +158,9 @@ def __init__( """ from spikeinterface.extractors import NumpyRecording + self.has_analog_channels = True + self.has_digital_channels = False + if ttl_times is None: # Begin in 'off' state number_of_periods = int(np.ceil((signal_duration - ttl_duration) / (ttl_duration * 2))) @@ -127,6 +169,7 @@ def __init__( number_of_channels = len(ttl_times) channel_ids = [f"nidq#XA{channel_index}" for channel_index in range(number_of_channels)] # NIDQ channel IDs channel_groups = ["NIDQChannelGroup"] * number_of_channels + self.analog_channel_ids = channel_ids sampling_frequency = 25_000.0 # NIDQ sampling rate number_of_frames = int(signal_duration * sampling_frequency) @@ -178,6 +221,9 @@ def __init__( ) def get_metadata(self) -> dict: + """ + Returns the metadata dictionary for the current object. + """ metadata = super().get_metadata() session_start_time = datetime.now().astimezone() metadata["NWBFile"]["session_start_time"] = session_start_time @@ -225,7 +271,7 @@ def __init__( verbose=verbose, ) - def get_metadata(self) -> dict: # noqa D102 + def get_metadata(self) -> dict: metadata = super().get_metadata() session_start_time = datetime.now().astimezone() metadata["NWBFile"]["session_start_time"] = session_start_time diff --git a/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py b/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py index 10e33cbc8..7cdec0d2c 100644 --- a/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py +++ b/src/neuroconv/tools/yaml_conversion_specification/_yaml_conversion_specification.py @@ -1,12 +1,15 @@ -import sys +import json +import os from importlib import import_module from pathlib import Path from typing import Optional import click -from jsonschema import RefResolver, validate +from jsonschema import validate from pydantic import DirectoryPath, FilePath +from referencing import Registry, Resource +from ..data_transfers import automatic_dandi_upload from ...nwbconverter import NWBConverter from ...utils import dict_deep_update, load_dict_from_file @@ -50,7 +53,7 @@ def run_conversion_from_yaml( data_folder_path: Optional[DirectoryPath] = None, output_folder_path: Optional[DirectoryPath] = None, overwrite: bool = False, -): +) -> None: """ Run conversion to NWB given a yaml specification file. @@ -73,20 +76,41 @@ def run_conversion_from_yaml( if data_folder_path is None: data_folder_path = Path(specification_file_path).parent + else: + data_folder_path = Path(data_folder_path) + data_folder_path.mkdir(exist_ok=True) + if output_folder_path is None: - output_folder_path = Path(specification_file_path).parent + output_folder_path = specification_file_path.parent else: output_folder_path = Path(output_folder_path) + output_folder_path.mkdir(exist_ok=True) + specification = load_dict_from_file(file_path=specification_file_path) schema_folder = Path(__file__).parent.parent.parent / "schemas" + + # Load all required schemas specification_schema = load_dict_from_file(file_path=schema_folder / "yaml_conversion_specification_schema.json") - sys_uri_base = "file:/" if sys.platform.startswith("win32") else "file://" + metadata_schema = load_dict_from_file(file_path=schema_folder / "metadata_schema.json") + + # The yaml specification references the metadata schema, so we need to load it into the registry + registry = Registry().with_resource("metadata_schema.json", Resource.from_contents(metadata_schema)) + + # Validate using the registry validate( instance=specification, schema=specification_schema, - resolver=RefResolver(base_uri=sys_uri_base + str(schema_folder) + "/", referrer=specification_schema), + registry=registry, ) + upload_to_dandiset = "upload_to_dandiset" in specification + if upload_to_dandiset and "DANDI_API_KEY" not in os.environ: + message = ( + "The 'upload_to_dandiset' prompt was found in the YAML specification, " + "but the environment variable 'DANDI_API_KEY' was not set." + ) + raise ValueError(message) + global_metadata = specification.get("metadata", dict()) global_conversion_options = specification.get("conversion_options", dict()) data_interfaces_spec = specification.get("data_interfaces") @@ -102,6 +126,7 @@ def run_conversion_from_yaml( experiment_metadata = experiment.get("metadata", dict()) for session in experiment["sessions"]: file_counter += 1 + source_data = session["source_data"] for interface_name, interface_source_data in session["source_data"].items(): for key, value in interface_source_data.items(): @@ -109,21 +134,47 @@ def run_conversion_from_yaml( source_data[interface_name].update({key: [str(Path(data_folder_path) / x) for x in value]}) elif key in ("file_path", "folder_path"): source_data[interface_name].update({key: str(Path(data_folder_path) / value)}) + converter = CustomNWBConverter(source_data=source_data) + metadata = converter.get_metadata() for metadata_source in [global_metadata, experiment_metadata, session.get("metadata", dict())]: metadata = dict_deep_update(metadata, metadata_source) - nwbfile_name = session.get("nwbfile_name", f"temp_nwbfile_name_{file_counter}").strip(".nwb") + + session_id = session.get("metadata", dict()).get("NWBFile", dict()).get("session_id", None) + if upload_to_dandiset and session_id is None: + message = ( + "The 'upload_to_dandiset' prompt was found in the YAML specification, " + "but the 'session_id' was not found for session with info block: " + f"\n\n {json.dumps(obj=session, indent=2)}\n\n" + "File intended for DANDI upload must include a session ID." + ) + raise ValueError(message) + session_conversion_options = session.get("conversion_options", dict()) conversion_options = dict() for key in converter.data_interface_objects: conversion_options[key] = dict(session_conversion_options.get(key, dict()), **global_conversion_options) + + nwbfile_name = session.get("nwbfile_name", f"temp_nwbfile_name_{file_counter}").strip(".nwb") converter.run_conversion( nwbfile_path=output_folder_path / f"{nwbfile_name}.nwb", metadata=metadata, overwrite=overwrite, conversion_options=conversion_options, ) + + if upload_to_dandiset: + dandiset_id = specification["upload_to_dandiset"] + staging = int(dandiset_id) >= 200_000 + automatic_dandi_upload( + dandiset_id=dandiset_id, + nwb_folder_path=output_folder_path, + staging=staging, + ) + + return None # We can early return since organization below will occur within the upload step + # To properly mimic a true dandi organization, the full directory must be populated with NWBFiles. all_nwbfile_paths = [nwbfile_path for nwbfile_path in output_folder_path.iterdir() if nwbfile_path.suffix == ".nwb"] nwbfile_paths_to_set = [ diff --git a/src/neuroconv/utils/__init__.py b/src/neuroconv/utils/__init__.py index c0061a983..f7163f3ff 100644 --- a/src/neuroconv/utils/__init__.py +++ b/src/neuroconv/utils/__init__.py @@ -7,7 +7,7 @@ load_dict_from_file, ) from .json_schema import ( - _NWBMetaDataEncoder, + NWBMetaDataEncoder, fill_defaults, get_base_schema, get_metadata_schema_for_icephys, diff --git a/src/neuroconv/utils/dict.py b/src/neuroconv/utils/dict.py index f0507b653..a6cef630a 100644 --- a/src/neuroconv/utils/dict.py +++ b/src/neuroconv/utils/dict.py @@ -209,12 +209,29 @@ class DeepDict(defaultdict): """A defaultdict of defaultdicts""" def __init__(self, *args: Any, **kwargs: Any) -> None: + """A defaultdict of defaultdicts""" super().__init__(lambda: DeepDict(), *args, **kwargs) for key, value in self.items(): if isinstance(value, dict): self[key] = DeepDict(value) def deep_update(self, other: Optional[Union[dict, "DeepDict"]] = None, **kwargs) -> None: + """ + Recursively update the DeepDict with another dictionary or DeepDict. + + Parameters + ---------- + other : dict or DeepDict, optional + The dictionary or DeepDict to update the current instance with. + **kwargs : Any + Additional keyword arguments representing key-value pairs to update the DeepDict. + + Notes + ----- + For any keys that exist in both the current instance and the provided dictionary, the values are merged + recursively if both are dictionaries. Otherwise, the value from `other` or `kwargs` will overwrite the + existing value. + """ for key, value in (other or kwargs).items(): if key in self and isinstance(self[key], dict) and isinstance(value, dict): self[key].deep_update(value) diff --git a/src/neuroconv/utils/json_schema.py b/src/neuroconv/utils/json_schema.py index 07dc3321f..6aa7a75d0 100644 --- a/src/neuroconv/utils/json_schema.py +++ b/src/neuroconv/utils/json_schema.py @@ -16,13 +16,8 @@ from pynwb.icephys import IntracellularElectrode -class _NWBMetaDataEncoder(json.JSONEncoder): - """ - Custom JSON encoder for NWB metadata. - - This encoder extends the default JSONEncoder class and provides custom serialization - for certain data types commonly used in NWB metadata. - """ +class _GenericNeuroconvEncoder(json.JSONEncoder): + """Generic JSON encoder for NeuroConv data.""" def default(self, obj): """ @@ -36,45 +31,38 @@ def default(self, obj): if isinstance(obj, np.generic): return obj.item() + # Numpy arrays should be converted to lists if isinstance(obj, np.ndarray): return obj.tolist() + # Over-write behaviors for Paths + if isinstance(obj, Path): + return str(obj) + # The base-class handles it return super().default(obj) -class _NWBSourceDataEncoder(_NWBMetaDataEncoder): +class _NWBMetaDataEncoder(_GenericNeuroconvEncoder): """ - Custom JSON encoder for data interface source data (i.e. kwargs). - - This encoder extends the default JSONEncoder class and provides custom serialization - for certain data types commonly used in interface source data. + Custom JSON encoder for NWB metadata. """ - def default(self, obj): - # Over-write behaviors for Paths - if isinstance(obj, Path): - return str(obj) - - return super().default(obj) +class _NWBSourceDataEncoder(_GenericNeuroconvEncoder): + """ + Custom JSON encoder for data interface source data (i.e. kwargs). + """ -class _NWBConversionOptionsEncoder(_NWBMetaDataEncoder): +class _NWBConversionOptionsEncoder(_GenericNeuroconvEncoder): """ Custom JSON encoder for conversion options of the data interfaces and converters (i.e. kwargs). - - This encoder extends the default JSONEncoder class and provides custom serialization - for certain data types commonly used in interface source data. """ - def default(self, obj): - - # Over-write behaviors for Paths - if isinstance(obj, Path): - return str(obj) - return super().default(obj) +# This is used in the Guide so we will keep it public. +NWBMetaDataEncoder = _NWBMetaDataEncoder def get_base_schema( diff --git a/tests/imports.py b/tests/imports.py index 5f8b65e72..7ac95713b 100644 --- a/tests/imports.py +++ b/tests/imports.py @@ -68,6 +68,7 @@ def test_tools(self): "get_package_version", "is_package_installed", "deploy_process", + "data_transfers", "LocalPathExpander", "get_module", ] diff --git a/tests/test_ecephys/test_mock_nidq_interface.py b/tests/test_ecephys/test_mock_nidq_interface.py index c0fb4eed2..1b098e1bb 100644 --- a/tests/test_ecephys/test_mock_nidq_interface.py +++ b/tests/test_ecephys/test_mock_nidq_interface.py @@ -1,4 +1,3 @@ -import pathlib from datetime import datetime from numpy.testing import assert_array_almost_equal @@ -46,47 +45,26 @@ def test_mock_metadata(): metadata = interface.get_metadata() - expected_ecephys_metadata = { - "Ecephys": { - "Device": [ - { - "name": "NIDQBoard", - "description": "A NIDQ board used in conjunction with SpikeGLX.", - "manufacturer": "National Instruments", - }, - ], - "ElectrodeGroup": [ - { - "name": "NIDQChannelGroup", - "description": "A group representing the NIDQ channels.", - "device": "NIDQBoard", - "location": "unknown", - }, - ], - "Electrodes": [ - {"name": "group_name", "description": "Name of the ElectrodeGroup this electrode is a part of."} - ], - "ElectricalSeriesNIDQ": { - "name": "ElectricalSeriesNIDQ", - "description": "Raw acquisition traces from the NIDQ (.nidq.bin) channels.", - }, - } - } - - assert metadata["Ecephys"] == expected_ecephys_metadata["Ecephys"] + expected_devices_metadata = [ + { + "name": "NIDQBoard", + "description": "A NIDQ board used in conjunction with SpikeGLX.", + "manufacturer": "National Instruments", + }, + ] + + assert metadata["Devices"] == expected_devices_metadata expected_start_time = datetime(2020, 11, 3, 10, 35, 10) assert metadata["NWBFile"]["session_start_time"] == expected_start_time -def test_mock_run_conversion(tmpdir: pathlib.Path): +def test_mock_run_conversion(tmp_path): interface = MockSpikeGLXNIDQInterface() metadata = interface.get_metadata() - test_directory = pathlib.Path(tmpdir) / "TestMockSpikeGLXNIDQInterface" - test_directory.mkdir(exist_ok=True) - nwbfile_path = test_directory / "test_mock_run_conversion.nwb" + nwbfile_path = tmp_path / "test_mock_run_conversion.nwb" interface.run_conversion(nwbfile_path=nwbfile_path, metadata=metadata, overwrite=True) with NWBHDF5IO(path=nwbfile_path, mode="r") as io: @@ -94,11 +72,3 @@ def test_mock_run_conversion(tmpdir: pathlib.Path): assert "NIDQBoard" in nwbfile.devices assert len(nwbfile.devices) == 1 - - assert "NIDQChannelGroup" in nwbfile.electrode_groups - assert len(nwbfile.electrode_groups) == 1 - - assert list(nwbfile.electrodes.id[:]) == [0, 1, 2, 3, 4, 5, 6, 7] - - assert "ElectricalSeriesNIDQ" in nwbfile.acquisition - assert len(nwbfile.acquisition) == 1 diff --git a/tests/test_minimal/test_tools/dandi_transfer_tools.py b/tests/test_minimal/test_tools/dandi_transfer_tools.py index df4226d10..da35725a0 100644 --- a/tests/test_minimal/test_tools/dandi_transfer_tools.py +++ b/tests/test_minimal/test_tools/dandi_transfer_tools.py @@ -1,13 +1,9 @@ import os import sys from datetime import datetime -from pathlib import Path from platform import python_version as get_python_version -from shutil import rmtree -from tempfile import mkdtemp import pytest -from hdmf.testing import TestCase from pynwb import NWBHDF5IO from neuroconv.tools.data_transfers import automatic_dandi_upload @@ -24,80 +20,63 @@ not HAVE_DANDI_KEY, reason="You must set your DANDI_API_KEY to run this test!", ) -class TestAutomaticDANDIUpload(TestCase): - def setUp(self): - self.tmpdir = Path(mkdtemp()) - self.nwb_folder_path = self.tmpdir / "test_nwb" - self.nwb_folder_path.mkdir() - metadata = get_default_nwbfile_metadata() - metadata["NWBFile"].update( - session_start_time=datetime.now().astimezone(), - session_id=f"test-automatic-upload-{sys.platform}-{get_python_version().replace('.', '-')}", - ) - metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) - with NWBHDF5IO(path=self.nwb_folder_path / "test_nwb_1.nwb", mode="w") as io: - io.write(make_nwbfile_from_metadata(metadata=metadata)) +def test_automatic_dandi_upload(tmp_path): + nwb_folder_path = tmp_path / "test_nwb" + nwb_folder_path.mkdir() + metadata = get_default_nwbfile_metadata() + metadata["NWBFile"].update( + session_start_time=datetime.now().astimezone(), + session_id=f"test-automatic-upload-{sys.platform}-{get_python_version().replace('.', '-')}", + ) + metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) + with NWBHDF5IO(path=nwb_folder_path / "test_nwb_1.nwb", mode="w") as io: + io.write(make_nwbfile_from_metadata(metadata=metadata)) - def tearDown(self): - rmtree(self.tmpdir) - - def test_automatic_dandi_upload(self): - automatic_dandi_upload(dandiset_id="200560", nwb_folder_path=self.nwb_folder_path, staging=True) + automatic_dandi_upload(dandiset_id="200560", nwb_folder_path=nwb_folder_path, staging=True) @pytest.mark.skipif( not HAVE_DANDI_KEY, reason="You must set your DANDI_API_KEY to run this test!", ) -class TestAutomaticDANDIUploadNonParallel(TestCase): - def setUp(self): - self.tmpdir = Path(mkdtemp()) - self.nwb_folder_path = self.tmpdir / "test_nwb" - self.nwb_folder_path.mkdir() - metadata = get_default_nwbfile_metadata() - metadata["NWBFile"].update( - session_start_time=datetime.now().astimezone(), - session_id=f"test-automatic-upload-{sys.platform}-{get_python_version().replace('.', '-')}-non-parallel", - ) - metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) - with NWBHDF5IO(path=self.nwb_folder_path / "test_nwb_2.nwb", mode="w") as io: - io.write(make_nwbfile_from_metadata(metadata=metadata)) - - def tearDown(self): - rmtree(self.tmpdir) +def test_automatic_dandi_upload_non_parallel(tmp_path): + nwb_folder_path = tmp_path / "test_nwb" + nwb_folder_path.mkdir() + metadata = get_default_nwbfile_metadata() + metadata["NWBFile"].update( + session_start_time=datetime.now().astimezone(), + session_id=(f"test-automatic-upload-{sys.platform}-" f"{get_python_version().replace('.', '-')}-non-parallel"), + ) + metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) + with NWBHDF5IO(path=nwb_folder_path / "test_nwb_2.nwb", mode="w") as io: + io.write(make_nwbfile_from_metadata(metadata=metadata)) - def test_automatic_dandi_upload_non_parallel(self): - automatic_dandi_upload( - dandiset_id="200560", nwb_folder_path=self.nwb_folder_path, staging=True, number_of_jobs=1 - ) + automatic_dandi_upload(dandiset_id="200560", nwb_folder_path=nwb_folder_path, staging=True, number_of_jobs=1) @pytest.mark.skipif( not HAVE_DANDI_KEY, reason="You must set your DANDI_API_KEY to run this test!", ) -class TestAutomaticDANDIUploadNonParallelNonThreaded(TestCase): - def setUp(self): - self.tmpdir = Path(mkdtemp()) - self.nwb_folder_path = self.tmpdir / "test_nwb" - self.nwb_folder_path.mkdir() - metadata = get_default_nwbfile_metadata() - metadata["NWBFile"].update( - session_start_time=datetime.now().astimezone(), - session_id=f"test-automatic-upload-{sys.platform}-{get_python_version().replace('.', '-')}-non-parallel-non-threaded", - ) - metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) - with NWBHDF5IO(path=self.nwb_folder_path / "test_nwb_3.nwb", mode="w") as io: - io.write(make_nwbfile_from_metadata(metadata=metadata)) - - def tearDown(self): - rmtree(self.tmpdir) +def test_automatic_dandi_upload_non_parallel_non_threaded(tmp_path): + nwb_folder_path = tmp_path / "test_nwb" + nwb_folder_path.mkdir() + metadata = get_default_nwbfile_metadata() + metadata["NWBFile"].update( + session_start_time=datetime.now().astimezone(), + session_id=( + f"test-automatic-upload-{sys.platform}-" + f"{get_python_version().replace('.', '-')}-non-parallel-non-threaded" + ), + ) + metadata.update(Subject=dict(subject_id="foo", species="Mus musculus", age="P1D", sex="U")) + with NWBHDF5IO(path=nwb_folder_path / "test_nwb_3.nwb", mode="w") as io: + io.write(make_nwbfile_from_metadata(metadata=metadata)) - def test_automatic_dandi_upload_non_parallel_non_threaded(self): - automatic_dandi_upload( - dandiset_id="200560", - nwb_folder_path=self.nwb_folder_path, - staging=True, - number_of_jobs=1, - number_of_threads=1, - ) + automatic_dandi_upload( + dandiset_id="200560", + nwb_folder_path=nwb_folder_path, + staging=True, + number_of_jobs=1, + number_of_threads=1, + ) diff --git a/tests/test_minimal/test_tools/test_expand_paths.py b/tests/test_minimal/test_tools/test_expand_paths.py index 9e7f03631..59924f93a 100644 --- a/tests/test_minimal/test_tools/test_expand_paths.py +++ b/tests/test_minimal/test_tools/test_expand_paths.py @@ -9,7 +9,7 @@ from neuroconv.tools import LocalPathExpander from neuroconv.tools.path_expansion import construct_path_template from neuroconv.tools.testing import generate_path_expander_demo_ibl -from neuroconv.utils import _NWBMetaDataEncoder +from neuroconv.utils.json_schema import _NWBMetaDataEncoder def create_test_directories_and_files( diff --git a/tests/test_minimal/test_utils/test_json_schema_utils.py b/tests/test_minimal/test_utils/test_json_schema_utils.py index 4edf1e724..5ce63ee56 100644 --- a/tests/test_minimal/test_utils/test_json_schema_utils.py +++ b/tests/test_minimal/test_utils/test_json_schema_utils.py @@ -6,12 +6,12 @@ from pynwb.ophys import ImagingPlane, TwoPhotonSeries from neuroconv.utils import ( - _NWBMetaDataEncoder, dict_deep_update, fill_defaults, get_schema_from_hdmf_class, load_dict_from_file, ) +from neuroconv.utils.json_schema import _NWBMetaDataEncoder def compare_dicts(a: dict, b: dict): diff --git a/tests/test_on_data/ecephys/spikeglx_single_probe_metadata.json b/tests/test_on_data/ecephys/spikeglx_single_probe_metadata.json index 637124cd0..f3f5fb595 100644 --- a/tests/test_on_data/ecephys/spikeglx_single_probe_metadata.json +++ b/tests/test_on_data/ecephys/spikeglx_single_probe_metadata.json @@ -8,11 +8,6 @@ "name": "NeuropixelImec0", "description": "{\"probe_type\": \"0\", \"probe_type_description\": \"NP1.0\", \"flex_part_number\": \"NP2_FLEX_0\", \"connected_base_station_part_number\": \"NP2_QBSC_00\"}", "manufacturer": "Imec" - }, - { - "name": "NIDQBoard", - "description": "A NIDQ board used in conjunction with SpikeGLX.", - "manufacturer": "National Instruments" } ], "ElectrodeGroup": [ @@ -21,17 +16,12 @@ "description": "A group representing probe/shank 'Imec0'.", "location": "unknown", "device": "NeuropixelImec0" - }, - { - "name": "NIDQChannelGroup", - "description": "A group representing the NIDQ channels.", - "location": "unknown", - "device": "NIDQBoard" } + ], - "ElectricalSeriesAPImec0": { - "name": "ElectricalSeriesAPImec0", - "description": "Acquisition traces for the ElectricalSeriesAPImec0." + "ElectricalSeriesAP": { + "name": "ElectricalSeriesAP", + "description": "Acquisition traces for the ElectricalSeriesAP." }, "Electrodes": [ { @@ -47,13 +37,9 @@ "description": "The id of the contact on the electrode" } ], - "ElectricalSeriesNIDQ": { - "name": "ElectricalSeriesNIDQ", - "description": "Raw acquisition traces from the NIDQ (.nidq.bin) channels." - }, - "ElectricalSeriesLFImec0": { - "name": "ElectricalSeriesLFImec0", - "description": "Acquisition traces for the ElectricalSeriesLFImec0." + "ElectricalSeriesLF": { + "name": "ElectricalSeriesLF", + "description": "Acquisition traces for the ElectricalSeriesLF." } } } diff --git a/tests/test_on_data/ecephys/test_aux_interfaces.py b/tests/test_on_data/ecephys/test_aux_interfaces.py deleted file mode 100644 index 7934e29a1..000000000 --- a/tests/test_on_data/ecephys/test_aux_interfaces.py +++ /dev/null @@ -1,100 +0,0 @@ -import unittest -from datetime import datetime - -import pytest -from parameterized import param, parameterized -from spikeinterface.core.testing import check_recordings_equal -from spikeinterface.extractors import NwbRecordingExtractor - -from neuroconv import NWBConverter -from neuroconv.datainterfaces import SpikeGLXNIDQInterface - -# enable to run locally in interactive mode -try: - from ..setup_paths import ECEPHY_DATA_PATH as DATA_PATH - from ..setup_paths import OUTPUT_PATH -except ImportError: - from setup_paths import ECEPHY_DATA_PATH as DATA_PATH - from setup_paths import OUTPUT_PATH - -if not DATA_PATH.exists(): - pytest.fail(f"No folder found in location: {DATA_PATH}!") - - -def custom_name_func(testcase_func, param_num, param): - interface_name = param.kwargs["data_interface"].__name__ - reduced_interface_name = interface_name.replace("Interface", "") - - return ( - f"{testcase_func.__name__}_{param_num}_" - f"{parameterized.to_safe_name(reduced_interface_name)}" - f"_{param.kwargs.get('case_name', '')}" - ) - - -class TestEcephysAuxNwbConversions(unittest.TestCase): - savedir = OUTPUT_PATH - - parameterized_aux_list = [ - param( - data_interface=SpikeGLXNIDQInterface, - interface_kwargs=dict(file_path=str(DATA_PATH / "spikeglx" / "Noise4Sam_g0" / "Noise4Sam_g0_t0.nidq.bin")), - case_name="load_sync_channel_False", - ), - param( - data_interface=SpikeGLXNIDQInterface, - interface_kwargs=dict( - file_path=str(DATA_PATH / "spikeglx" / "Noise4Sam_g0" / "Noise4Sam_g0_t0.nidq.bin"), - load_sync_channel=True, - ), - case_name="load_sync_channel_True", - ), - ] - - @parameterized.expand(input=parameterized_aux_list, name_func=custom_name_func) - def test_aux_recording_extractor_to_nwb(self, data_interface, interface_kwargs, case_name=""): - nwbfile_path = str(self.savedir / f"{data_interface.__name__}_{case_name}.nwb") - - class TestConverter(NWBConverter): - data_interface_classes = dict(TestAuxRecording=data_interface) - - converter = TestConverter(source_data=dict(TestAuxRecording=interface_kwargs)) - - for interface_kwarg in interface_kwargs: - if interface_kwarg in ["file_path", "folder_path"]: - self.assertIn( - member=interface_kwarg, container=converter.data_interface_objects["TestAuxRecording"].source_data - ) - - metadata = converter.get_metadata() - metadata["NWBFile"].update(session_start_time=datetime.now().astimezone()) - converter.run_conversion(nwbfile_path=nwbfile_path, overwrite=True, metadata=metadata) - recording = converter.data_interface_objects["TestAuxRecording"].recording_extractor - - electrical_series_name = metadata["Ecephys"][converter.data_interface_objects["TestAuxRecording"].es_key][ - "name" - ] - - # NWBRecordingExtractor on spikeinterface does not yet support loading data written from multiple segments. - if recording.get_num_segments() == 1: - # Spikeinterface behavior is to load the electrode table channel_name property as a channel_id - nwb_recording = NwbRecordingExtractor(file_path=nwbfile_path, electrical_series_name=electrical_series_name) - if "channel_name" in recording.get_property_keys(): - renamed_channel_ids = recording.get_property("channel_name") - else: - renamed_channel_ids = recording.get_channel_ids().astype("str") - recording = recording.channel_slice( - channel_ids=recording.get_channel_ids(), renamed_channel_ids=renamed_channel_ids - ) - - # Edge case that only occurs in testing; I think it's fixed in > 0.96.1 versions (unreleased as of 1/11/23) - # The NwbRecordingExtractor on spikeinterface experiences an issue when duplicated channel_ids - # are specified, which occurs during check_recordings_equal when there is only one channel - if nwb_recording.get_channel_ids()[0] != nwb_recording.get_channel_ids()[-1]: - check_recordings_equal(RX1=recording, RX2=nwb_recording, return_scaled=False) - if recording.has_scaled_traces() and nwb_recording.has_scaled_traces(): - check_recordings_equal(RX1=recording, RX2=nwb_recording, return_scaled=True) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_on_data/ecephys/test_lfp.py b/tests/test_on_data/ecephys/test_lfp.py index c46f8d297..010516d86 100644 --- a/tests/test_on_data/ecephys/test_lfp.py +++ b/tests/test_on_data/ecephys/test_lfp.py @@ -57,9 +57,7 @@ class TestEcephysLFPNwbConversions(unittest.TestCase): param( data_interface=SpikeGLXRecordingInterface, interface_kwargs=dict( - file_path=( - DATA_PATH / "spikeglx" / "Noise4Sam_g0" / "Noise4Sam_g0_imec0" / "Noise4Sam_g0_t0.imec0.lf.bin" - ) + folder_path=DATA_PATH / "spikeglx" / "Noise4Sam_g0" / "Noise4Sam_g0_imec0", stream_id="imec0.lf" ), expected_write_module="raw", ), diff --git a/tests/test_on_data/ecephys/test_nidq_interface.py b/tests/test_on_data/ecephys/test_nidq_interface.py new file mode 100644 index 000000000..6d6517323 --- /dev/null +++ b/tests/test_on_data/ecephys/test_nidq_interface.py @@ -0,0 +1,57 @@ +import numpy as np +import pytest +from pynwb import NWBHDF5IO + +from neuroconv.datainterfaces import SpikeGLXNIDQInterface + +# enable to run locally in interactive mode +try: + from ..setup_paths import ECEPHY_DATA_PATH +except ImportError: + from setup_paths import ECEPHY_DATA_PATH + +if not ECEPHY_DATA_PATH.exists(): + pytest.fail(f"No folder found in location: {ECEPHY_DATA_PATH}!") + + +def test_nidq_interface_digital_data(tmp_path): + + nwbfile_path = tmp_path / "nidq_test_digital.nwb" + folder_path = ECEPHY_DATA_PATH / "spikeglx" / "DigitalChannelTest_g0" + interface = SpikeGLXNIDQInterface(folder_path=folder_path) + interface.run_conversion(nwbfile_path=nwbfile_path, overwrite=True) + + with NWBHDF5IO(nwbfile_path, "r") as io: + nwbfile = io.read() + assert len(nwbfile.acquisition) == 1 # Only one channel has data for this set + events = nwbfile.acquisition["EventsNIDQDigitalChannelXD0"] + assert events.name == "EventsNIDQDigitalChannelXD0" + assert events.timestamps.size == 326 + assert len(nwbfile.devices) == 1 + + data = events.data + # Check that there is one followed by 0 + np.sum(data == 1) == 163 + np.sum(data == 0) == 163 + + +def test_nidq_interface_analog_data(tmp_path): + + nwbfile_path = tmp_path / "nidq_test_analog.nwb" + folder_path = ECEPHY_DATA_PATH / "spikeglx" / "Noise4Sam_g0" + interface = SpikeGLXNIDQInterface(folder_path=folder_path) + interface.run_conversion(nwbfile_path=nwbfile_path, overwrite=True) + + with NWBHDF5IO(nwbfile_path, "r") as io: + nwbfile = io.read() + assert len(nwbfile.acquisition) == 1 # The time series object + time_series = nwbfile.acquisition["TimeSeriesNIDQ"] + assert time_series.name == "TimeSeriesNIDQ" + expected_description = "Analog data from the NIDQ board. Channels are ['XA0' 'XA1' 'XA2' 'XA3' 'XA4' 'XA5' 'XA6' 'XA7'] in that order." + assert time_series.description == expected_description + number_of_samples = time_series.data.shape[0] + assert number_of_samples == 60_864 + number_of_channels = time_series.data.shape[1] + assert number_of_channels == 8 + + assert len(nwbfile.devices) == 1 diff --git a/tests/test_on_data/ecephys/test_recording_interfaces.py b/tests/test_on_data/ecephys/test_recording_interfaces.py index 7677ded22..0520b5b42 100644 --- a/tests/test_on_data/ecephys/test_recording_interfaces.py +++ b/tests/test_on_data/ecephys/test_recording_interfaces.py @@ -641,9 +641,7 @@ def test_extracted_metadata(self, setup_interface): class TestSpikeGLXRecordingInterface(RecordingExtractorInterfaceTestMixin): data_interface_cls = SpikeGLXRecordingInterface interface_kwargs = dict( - file_path=str( - ECEPHY_DATA_PATH / "spikeglx" / "Noise4Sam_g0" / "Noise4Sam_g0_imec0" / "Noise4Sam_g0_t0.imec0.ap.bin" - ) + folder_path=ECEPHY_DATA_PATH / "spikeglx" / "Noise4Sam_g0" / "Noise4Sam_g0_imec0", stream_id="imec0.ap" ) save_directory = OUTPUT_PATH diff --git a/tests/test_on_data/ecephys/test_spikeglx_converter.py b/tests/test_on_data/ecephys/test_spikeglx_converter.py index af98789c1..93b228053 100644 --- a/tests/test_on_data/ecephys/test_spikeglx_converter.py +++ b/tests/test_on_data/ecephys/test_spikeglx_converter.py @@ -33,20 +33,20 @@ def assertNWBFileStructure(self, nwbfile_path: FilePath, expected_session_start_ with NWBHDF5IO(path=nwbfile_path) as io: nwbfile = io.read() - assert nwbfile.session_start_time == expected_session_start_time + assert nwbfile.session_start_time.replace(tzinfo=None) == expected_session_start_time + + assert "ElectricalSeriesAP" in nwbfile.acquisition + assert "ElectricalSeriesLF" in nwbfile.acquisition + assert "TimeSeriesNIDQ" in nwbfile.acquisition - assert "ElectricalSeriesAPImec0" in nwbfile.acquisition - assert "ElectricalSeriesLFImec0" in nwbfile.acquisition - assert "ElectricalSeriesNIDQ" in nwbfile.acquisition assert len(nwbfile.acquisition) == 3 assert "NeuropixelImec0" in nwbfile.devices assert "NIDQBoard" in nwbfile.devices assert len(nwbfile.devices) == 2 - assert "NIDQChannelGroup" in nwbfile.electrode_groups assert "Imec0" in nwbfile.electrode_groups - assert len(nwbfile.electrode_groups) == 2 + assert len(nwbfile.electrode_groups) == 1 def test_single_probe_spikeglx_converter(self): converter = SpikeGLXConverterPipe(folder_path=SPIKEGLX_PATH / "Noise4Sam_g0") @@ -63,18 +63,17 @@ def test_single_probe_spikeglx_converter(self): expected_ecephys_metadata = expected_metadata["Ecephys"] test_ecephys_metadata = test_metadata["Ecephys"] + assert test_ecephys_metadata == expected_ecephys_metadata device_metadata = test_ecephys_metadata.pop("Device") expected_device_metadata = expected_ecephys_metadata.pop("Device") assert device_metadata == expected_device_metadata - assert test_ecephys_metadata == expected_ecephys_metadata - nwbfile_path = self.tmpdir / "test_single_probe_spikeglx_converter.nwb" converter.run_conversion(nwbfile_path=nwbfile_path, metadata=metadata) - expected_session_start_time = datetime(2020, 11, 3, 10, 35, 10).astimezone() + expected_session_start_time = datetime(2020, 11, 3, 10, 35, 10) self.assertNWBFileStructure(nwbfile_path=nwbfile_path, expected_session_start_time=expected_session_start_time) def test_in_converter_pipe(self): @@ -84,7 +83,7 @@ def test_in_converter_pipe(self): nwbfile_path = self.tmpdir / "test_spikeglx_converter_in_converter_pipe.nwb" converter_pipe.run_conversion(nwbfile_path=nwbfile_path) - expected_session_start_time = datetime(2020, 11, 3, 10, 35, 10).astimezone() + expected_session_start_time = datetime(2020, 11, 3, 10, 35, 10) self.assertNWBFileStructure(nwbfile_path=nwbfile_path, expected_session_start_time=expected_session_start_time) def test_in_nwbconverter(self): @@ -101,7 +100,7 @@ class TestConverter(NWBConverter): nwbfile_path = self.tmpdir / "test_spikeglx_converter_in_nwbconverter.nwb" converter.run_conversion(nwbfile_path=nwbfile_path) - expected_session_start_time = datetime(2020, 11, 3, 10, 35, 10).astimezone() + expected_session_start_time = datetime(2020, 11, 3, 10, 35, 10) self.assertNWBFileStructure(nwbfile_path=nwbfile_path, expected_session_start_time=expected_session_start_time) @@ -118,7 +117,9 @@ def assertNWBFileStructure(self, nwbfile_path: FilePath, expected_session_start_ with NWBHDF5IO(path=nwbfile_path) as io: nwbfile = io.read() - assert nwbfile.session_start_time == expected_session_start_time + # Do the comparison without timezone information to avoid CI timezone issues + # The timezone is set by pynbw automatically + assert nwbfile.session_start_time.replace(tzinfo=None) == expected_session_start_time # TODO: improve name of segments using 'Segment{index}' for clarity assert "ElectricalSeriesAPImec00" in nwbfile.acquisition @@ -129,7 +130,7 @@ def assertNWBFileStructure(self, nwbfile_path: FilePath, expected_session_start_ assert "ElectricalSeriesLFImec01" in nwbfile.acquisition assert "ElectricalSeriesLFImec10" in nwbfile.acquisition assert "ElectricalSeriesLFImec11" in nwbfile.acquisition - assert len(nwbfile.acquisition) == 8 + assert len(nwbfile.acquisition) == 16 assert "NeuropixelImec0" in nwbfile.devices assert "NeuropixelImec1" in nwbfile.devices @@ -141,7 +142,7 @@ def assertNWBFileStructure(self, nwbfile_path: FilePath, expected_session_start_ def test_multi_probe_spikeglx_converter(self): converter = SpikeGLXConverterPipe( - folder_path=SPIKEGLX_PATH / "multi_trigger_multi_gate" / "SpikeGLX" / "5-19-2022-CI0" / "5-19-2022-CI0_g0" + folder_path=SPIKEGLX_PATH / "multi_trigger_multi_gate" / "SpikeGLX" / "5-19-2022-CI0" ) metadata = converter.get_metadata() @@ -161,13 +162,12 @@ def test_multi_probe_spikeglx_converter(self): expected_device_metadata = expected_ecephys_metadata.pop("Device") assert device_metadata == expected_device_metadata - assert test_ecephys_metadata == expected_ecephys_metadata nwbfile_path = self.tmpdir / "test_multi_probe_spikeglx_converter.nwb" converter.run_conversion(nwbfile_path=nwbfile_path, metadata=metadata) - expected_session_start_time = datetime(2022, 5, 19, 17, 37, 47).astimezone() + expected_session_start_time = datetime(2022, 5, 19, 17, 37, 47) self.assertNWBFileStructure(nwbfile_path=nwbfile_path, expected_session_start_time=expected_session_start_time) @@ -182,18 +182,8 @@ def test_electrode_table_writing(tmp_path): electrodes_table = nwbfile.electrodes - # Test NIDQ - electrical_series = nwbfile.acquisition["ElectricalSeriesNIDQ"] - nidq_electrodes_table_region = electrical_series.electrodes - region_indices = nidq_electrodes_table_region.data - recording_extractor = converter.data_interface_objects["nidq"].recording_extractor - - saved_channel_names = electrodes_table[region_indices]["channel_name"] - expected_channel_names_nidq = recording_extractor.get_property("channel_name") - np.testing.assert_array_equal(saved_channel_names, expected_channel_names_nidq) - # Test AP - electrical_series = nwbfile.acquisition["ElectricalSeriesAPImec0"] + electrical_series = nwbfile.acquisition["ElectricalSeriesAP"] ap_electrodes_table_region = electrical_series.electrodes region_indices = ap_electrodes_table_region.data recording_extractor = converter.data_interface_objects["imec0.ap"].recording_extractor @@ -203,7 +193,7 @@ def test_electrode_table_writing(tmp_path): np.testing.assert_array_equal(saved_channel_names, expected_channel_names_ap) # Test LF - electrical_series = nwbfile.acquisition["ElectricalSeriesLFImec0"] + electrical_series = nwbfile.acquisition["ElectricalSeriesLF"] lf_electrodes_table_region = electrical_series.electrodes region_indices = lf_electrodes_table_region.data recording_extractor = converter.data_interface_objects["imec0.lf"].recording_extractor @@ -222,7 +212,7 @@ def test_electrode_table_writing(tmp_path): # Test round trip with spikeinterface recording_extractor_ap = NwbRecordingExtractor( file_path=nwbfile_path, - electrical_series_name="ElectricalSeriesAPImec0", + electrical_series_name="ElectricalSeriesAP", ) channel_ids = recording_extractor_ap.get_channel_ids() @@ -230,16 +220,8 @@ def test_electrode_table_writing(tmp_path): recording_extractor_lf = NwbRecordingExtractor( file_path=nwbfile_path, - electrical_series_name="ElectricalSeriesLFImec0", + electrical_series_name="ElectricalSeriesLF", ) channel_ids = recording_extractor_lf.get_channel_ids() np.testing.assert_array_equal(channel_ids, expected_channel_names_lf) - - recording_extractor_nidq = NwbRecordingExtractor( - file_path=nwbfile_path, - electrical_series_name="ElectricalSeriesNIDQ", - ) - - channel_ids = recording_extractor_nidq.get_channel_ids() - np.testing.assert_array_equal(channel_ids, expected_channel_names_nidq) diff --git a/tests/test_on_data/test_temporal_alignment/test_temporal_alignment_methods.py b/tests/test_on_data/test_temporal_alignment/test_temporal_alignment_methods.py index c8c6bad09..081cd172d 100644 --- a/tests/test_on_data/test_temporal_alignment/test_temporal_alignment_methods.py +++ b/tests/test_on_data/test_temporal_alignment/test_temporal_alignment_methods.py @@ -75,7 +75,6 @@ def assertNWBFileTimesAligned(self, nwbfile_path: Union[str, Path]): # High level groups were written to file assert "BehaviorEvents" in nwbfile.acquisition - assert "ElectricalSeriesNIDQ" in nwbfile.acquisition assert "trials" in nwbfile.intervals # Aligned data was written diff --git a/tests/test_on_data/test_yaml/conversion_specifications/GIN_conversion_specification_dandi_upload.yml b/tests/test_on_data/test_yaml/conversion_specifications/GIN_conversion_specification_dandi_upload.yml new file mode 100644 index 000000000..adf590d3a --- /dev/null +++ b/tests/test_on_data/test_yaml/conversion_specifications/GIN_conversion_specification_dandi_upload.yml @@ -0,0 +1,66 @@ +metadata: + NWBFile: + lab: My Lab + institution: My Institution + +conversion_options: + stub_test: True + +data_interfaces: + ap: SpikeGLXRecordingInterface + lf: SpikeGLXRecordingInterface + phy: PhySortingInterface + +upload_to_dandiset: "200560" + +experiments: + ymaze: + metadata: + NWBFile: + session_description: Subject navigating a Y-shaped maze. + + sessions: + - nwbfile_name: example_converter_spec_1 + source_data: + ap: + file_path: spikeglx/Noise4Sam_g0/Noise4Sam_g0_imec0/Noise4Sam_g0_t0.imec0.ap.bin + metadata: + NWBFile: + session_start_time: "2020-10-09T21:19:09+00:00" + session_id: "test-yaml-1" + Subject: + subject_id: "yaml-1" + sex: F + age: P35D + species: Mus musculus + - nwbfile_name: example_converter_spec_2.nwb + metadata: + NWBFile: + session_start_time: "2020-10-10T21:19:09+00:00" + session_id: "test-yaml-2" + Subject: + subject_id: "yaml-002" + sex: F + age: P35D + species: Mus musculus + source_data: + lf: + file_path: spikeglx/Noise4Sam_g0/Noise4Sam_g0_imec0/Noise4Sam_g0_t0.imec0.lf.bin + + open_explore: + sessions: + - nwbfile_name: example_converter_spec_3 + source_data: + lf: + file_path: spikeglx/Noise4Sam_g0/Noise4Sam_g0_imec0/Noise4Sam_g0_t0.imec0.lf.bin + phy: + folder_path: phy/phy_example_0/ + metadata: + NWBFile: + session_start_time: "2020-10-11T21:19:09+00:00" + session_id: test YAML 3 + Subject: + subject_id: YAML Subject Name + sex: F + age: P35D + species: Mus musculus diff --git a/tests/test_on_data/test_yaml/neuroconv_deployment_aws_tools_tests.py b/tests/test_on_data/test_yaml/neuroconv_deployment_aws_tools_tests.py new file mode 100644 index 000000000..f58865d26 --- /dev/null +++ b/tests/test_on_data/test_yaml/neuroconv_deployment_aws_tools_tests.py @@ -0,0 +1,167 @@ +import os +import pathlib +import time +import unittest + +import boto3 + +from neuroconv.tools.aws import deploy_neuroconv_batch_job + +from ..setup_paths import OUTPUT_PATH + +_RETRY_STATES = ["RUNNABLE", "PENDING", "STARTING", "RUNNING"] + + +class TestNeuroConvDeploymentBatchJob(unittest.TestCase): + """ + To allow this test to work, the developer must create a folder on the outer level of their personal Google Drive + called 'testing_rclone_spikegl_and_phy' with the following structure: + + testing_rclone_spikeglx_and_phy + ├── ci_tests + ├──── spikeglx + ├────── Noise4Sam_g0 + ├──── phy + ├────── phy_example_0 + + Where 'Noise4Sam' is from the 'spikeglx/Noise4Sam_g0' GIN ephys dataset and 'phy_example_0' is likewise from the + 'phy' folder of the same dataset. + + Then the developer must install Rclone and call `rclone config` to generate tokens in their own `rclone.conf` file. + The developer can easily find the location of the config file on their system using `rclone config file`. + """ + + test_folder = OUTPUT_PATH / "aws_rclone_tests" + test_config_file_path = test_folder / "rclone.conf" + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) + region = "us-east-2" + + def setUp(self): + self.test_folder.mkdir(exist_ok=True) + + # Pretend as if .conf file already exists on the system (created via interactive `rclone config` command) + token_dictionary = dict( + access_token=os.environ["RCLONE_DRIVE_ACCESS_TOKEN"], + token_type="Bearer", + refresh_token=os.environ["RCLONE_DRIVE_REFRESH_TOKEN"], + expiry=os.environ["RCLONE_EXPIRY_TOKEN"], + ) + token_string = str(token_dictionary).replace("'", '"').replace(" ", "") + rclone_config_contents = [ + "[test_google_drive_remote]\n", + "type = drive\n", + "scope = drive\n", + f"token = {token_string}\n", + "team_drive = \n", + "\n", + ] + with open(file=self.test_config_file_path, mode="w") as io: + io.writelines(rclone_config_contents) + + def test_deploy_neuroconv_batch_job(self): + region = "us-east-2" + aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) + aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) + + dynamodb_resource = boto3.resource( + service_name="dynamodb", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + batch_client = boto3.client( + service_name="batch", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + efs_client = boto3.client( + service_name="efs", + region_name=region, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + ) + # Assume no other tests of EFS volumes are fluctuating at the same time, otherwise make this more specific + efs_volumes_before = efs_client.describe_file_systems() + + rclone_command = ( + "rclone copy test_google_drive_remote:testing_rclone_spikeglx_and_phy/ci_tests /mnt/efs/source " + "--verbose --progress --config ./rclone.conf" # TODO: should just include this in helper function? + ) + + testing_base_folder_path = pathlib.Path(__file__).parent.parent.parent + yaml_specification_file_path = ( + testing_base_folder_path + / "test_on_data" + / "test_yaml" + / "conversion_specifications" + / "GIN_conversion_specification.yml" + ) + + rclone_config_file_path = self.test_config_file_path + + job_name = "test_deploy_neuroconv_batch_job" + efs_volume_name = "test_deploy_neuroconv_batch_job" + all_info = deploy_neuroconv_batch_job( + rclone_command=rclone_command, + yaml_specification_file_path=yaml_specification_file_path, + job_name=job_name, + efs_volume_name=efs_volume_name, + rclone_config_file_path=rclone_config_file_path, + ) + + # Wait additional time for AWS to clean up resources + time.sleep(120) + + info = all_info["neuroconv_job_submission_info"] + job_id = info["job_submission_info"]["jobId"] + job = None + max_retries = 10 + retry = 0 + while retry < max_retries: + job_description_response = batch_client.describe_jobs(jobs=[job_id]) + assert job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + jobs = job_description_response["jobs"] + assert len(jobs) == 1 + + job = jobs[0] + + if job["status"] in _RETRY_STATES: + retry += 1 + time.sleep(60) + else: + break + + # Check EFS cleaned up automatically + efs_volumes_after = efs_client.describe_file_systems() + assert len(efs_volumes_after["FileSystems"]) == len(efs_volumes_before["FileSystems"]) + + # Check normal job completion + expected_job_name = f"{job_name}_neuroconv_deployment" + assert job["jobName"] == expected_job_name + assert "neuroconv_batch_queue" in job["jobQueue"] + assert "fs-" in job["jobDefinition"] + assert job["status"] == "SUCCEEDED" + + status_tracker_table_name = "neuroconv_batch_status_tracker" + table = dynamodb_resource.Table(name=status_tracker_table_name) + table_submission_id = info["table_submission_info"]["id"] + + table_item_response = table.get_item(Key={"id": table_submission_id}) + assert table_item_response["ResponseMetadata"]["HTTPStatusCode"] == 200 + + table_item = table_item_response["Item"] + assert table_item["job_name"] == expected_job_name + assert table_item["job_id"] == job_id + assert table_item["status"] == "Job submitted..." + + table.update_item( + Key={"id": table_submission_id}, + AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed - cleaning up..."}}, + ) + + table.update_item( + Key={"id": table_submission_id}, AttributeUpdates={"status": {"Action": "PUT", "Value": "Test passed."}} + ) diff --git a/tests/test_on_data/test_yaml/test_yaml_conversion_specification.py b/tests/test_on_data/test_yaml/test_yaml_conversion_specification.py index 61c71cf86..5a623d141 100644 --- a/tests/test_on_data/test_yaml/test_yaml_conversion_specification.py +++ b/tests/test_on_data/test_yaml/test_yaml_conversion_specification.py @@ -1,12 +1,12 @@ -import sys import unittest from datetime import datetime from pathlib import Path import pytest from hdmf.testing import TestCase -from jsonschema import RefResolver, validate +from jsonschema import validate from pynwb import NWBHDF5IO +from referencing import Registry, Resource from neuroconv import run_conversion_from_yaml from neuroconv.utils import load_dict_from_file @@ -19,6 +19,7 @@ "fname", [ "GIN_conversion_specification.yml", + "GIN_conversion_specification_dandi_upload.yml", "GIN_conversion_specification_missing_nwbfile_names.yml", "GIN_conversion_specification_no_nwbfile_name_or_other_metadata.yml", "GIN_conversion_specification_videos.yml", @@ -27,16 +28,19 @@ def test_validate_example_specifications(fname): path_to_test_yml_files = Path(__file__).parent / "conversion_specifications" schema_folder = path_to_test_yml_files.parent.parent.parent.parent / "src" / "neuroconv" / "schemas" + + # Load schemas specification_schema = load_dict_from_file(file_path=schema_folder / "yaml_conversion_specification_schema.json") - sys_uri_base = "file://" - if sys.platform.startswith("win32"): - sys_uri_base = "file:/" + metadata_schema = load_dict_from_file(file_path=schema_folder / "metadata_schema.json") + + # The yaml specification references the metadata schema, so we need to load it into the registry + registry = Registry().with_resource("metadata_schema.json", Resource.from_contents(metadata_schema)) yaml_file_path = path_to_test_yml_files / fname validate( instance=load_dict_from_file(file_path=yaml_file_path), - schema=load_dict_from_file(file_path=schema_folder / "yaml_conversion_specification_schema.json"), - resolver=RefResolver(base_uri=sys_uri_base + str(schema_folder) + "/", referrer=specification_schema), + schema=specification_schema, + registry=registry, ) diff --git a/tests/test_on_data/test_yaml/yaml_aws_tools_tests.py b/tests/test_on_data/test_yaml/yaml_aws_tools_tests.py index 7ea49e644..e767e516b 100644 --- a/tests/test_on_data/test_yaml/yaml_aws_tools_tests.py +++ b/tests/test_on_data/test_yaml/yaml_aws_tools_tests.py @@ -36,6 +36,7 @@ class TestRcloneTransferBatchJob(unittest.TestCase): aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None) aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None) region = "us-east-2" + efs_id = None def setUp(self): self.test_folder.mkdir(exist_ok=True) @@ -66,7 +67,9 @@ def setUp(self): aws_secret_access_key=self.aws_secret_access_key, ) - def tearDown(self): + def tearDown(self) -> None: + if self.efs_id is None: + return None efs_client = self.efs_client # Cleanup EFS after testing is complete - must clear mount targets first, then wait before deleting the volume diff --git a/tests/test_on_data/test_yaml/yaml_dandi_transfer_tools.py b/tests/test_on_data/test_yaml/yaml_dandi_transfer_tools.py new file mode 100644 index 000000000..c36d072e7 --- /dev/null +++ b/tests/test_on_data/test_yaml/yaml_dandi_transfer_tools.py @@ -0,0 +1,53 @@ +import os +import platform +import time +from datetime import datetime, timedelta +from pathlib import Path + +import dandi.dandiapi +import pytest +from packaging.version import Version + +from neuroconv import run_conversion_from_yaml + +from ..setup_paths import ECEPHY_DATA_PATH, OUTPUT_PATH + +DANDI_API_KEY = os.getenv("DANDI_API_KEY") +HAVE_DANDI_KEY = DANDI_API_KEY is not None and DANDI_API_KEY != "" # can be "" from external forks +_PYTHON_VERSION = platform.python_version() + + +@pytest.mark.skipif( + not HAVE_DANDI_KEY or Version(".".join(_PYTHON_VERSION.split(".")[:2])) != Version("3.12"), + reason="You must set your DANDI_API_KEY to run this test!", +) +def test_run_conversion_from_yaml_with_dandi_upload(): + path_to_test_yml_files = Path(__file__).parent / "conversion_specifications" + yaml_file_path = path_to_test_yml_files / "GIN_conversion_specification_dandi_upload.yml" + run_conversion_from_yaml( + specification_file_path=yaml_file_path, + data_folder_path=ECEPHY_DATA_PATH, + output_folder_path=OUTPUT_PATH, + overwrite=True, + ) + + time.sleep(60) # Give some buffer room for server to process before making assertions against DANDI API + + client = dandi.dandiapi.DandiAPIClient(api_url="https://api-staging.dandiarchive.org/api") + dandiset = client.get_dandiset("200560") + + expected_asset_paths = [ + "sub-yaml-1/sub-yaml-1_ses-test-yaml-1_ecephys.nwb", + "sub-yaml-002/sub-yaml-002_ses-test-yaml-2_ecephys.nwb", + "sub-YAML-Subject-Name/sub-YAML-Subject-Name_ses-test-YAML-3_ecephys.nwb", + ] + for asset_path in expected_asset_paths: + test_asset = dandiset.get_asset_by_path(path=asset_path) # Will error if not found + test_asset_metadata = test_asset.get_raw_metadata() + + # Past uploads may have created the same apparent file, so look at the modification time to ensure + # this test is actually testing the most recent upload + date_modified = datetime.fromisoformat( + test_asset_metadata["dateModified"].split("Z")[0] # Timezones look a little messy + ) + assert datetime.now() - date_modified < timedelta(minutes=10)