From 8e20a25ad30609aefffc5ad6f9b56354ee5e6d10 Mon Sep 17 00:00:00 2001 From: Ben Dichter Date: Wed, 21 Aug 2024 16:54:46 -0400 Subject: [PATCH] [Pydantic III] Use list/dict annotations (#1021) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Cody Baker <51133164+CodyCBakerPhD@users.noreply.github.com> Co-authored-by: CodyCBakerPhD --- CHANGELOG.md | 1 + src/neuroconv/basedatainterface.py | 6 +- .../behavior/audio/audiointerface.py | 8 +-- .../behavior/deeplabcut/_dlc_utils.py | 4 +- .../deeplabcut/deeplabcutdatainterface.py | 4 +- .../lightningpose/lightningposeconverter.py | 6 +- .../lightningposedatainterface.py | 4 +- .../behavior/neuralynx/nvt_utils.py | 8 +-- .../behavior/video/video_utils.py | 2 +- .../behavior/video/videodatainterface.py | 14 ++-- .../baserecordingextractorinterface.py | 10 +-- .../ecephys/basesortingextractorinterface.py | 8 +-- .../neuralynx/neuralynxdatainterface.py | 10 +-- .../neuroscope/neuroscopedatainterface.py | 4 +- .../openephys/openephysbinarydatainterface.py | 4 +- .../openephys/openephyslegacydatainterface.py | 4 +- .../ecephys/phy/phydatainterface.py | 2 +- .../ecephys/spikeglx/spikeglxconverter.py | 6 +- .../ecephys/spikeglx/spikeglxnidqinterface.py | 3 +- .../icephys/abf/abfdatainterface.py | 5 +- .../icephys/baseicephysinterface.py | 3 +- .../brukertiff/brukertiffdatainterface.py | 6 +- .../text/timeintervalsinterface.py | 6 +- src/neuroconv/nwbconverter.py | 18 ++--- .../tools/aws/_submit_aws_batch_job.py | 10 +-- src/neuroconv/tools/data_transfers/_dandi.py | 4 +- src/neuroconv/tools/data_transfers/_globus.py | 14 ++-- src/neuroconv/tools/hdmf.py | 11 ++-- src/neuroconv/tools/importing.py | 8 +-- src/neuroconv/tools/neo/neo.py | 8 +-- .../_configuration_models/_base_backend.py | 16 ++--- .../_configuration_models/_base_dataset_io.py | 24 +++---- .../_configuration_models/_hdf5_backend.py | 4 +- .../_configuration_models/_hdf5_dataset_io.py | 6 +- .../_configuration_models/_zarr_backend.py | 4 +- .../_configuration_models/_zarr_dataset_io.py | 12 ++-- src/neuroconv/tools/optogenetics.py | 4 +- src/neuroconv/tools/path_expansion.py | 6 +- .../imagingextractordatachunkiterator.py | 4 +- .../tools/spikeinterface/spikeinterface.py | 66 +++++++++---------- ...pikeinterfacerecordingdatachunkiterator.py | 6 +- .../testing/_mock/_mock_dataset_models.py | 24 +++---- .../tools/testing/data_interface_mixins.py | 6 +- .../tools/testing/mock_interfaces.py | 6 +- src/neuroconv/tools/testing/mock_probes.py | 4 +- src/neuroconv/tools/text.py | 6 +- src/neuroconv/utils/json_schema.py | 12 ++-- 47 files changed, 201 insertions(+), 210 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e0f198d0..b343771a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ * `BaseRecordingInterface` now calls default metadata when metadata is not passing mimicking `run_conversion` behavior. [PR #1012](https://github.com/catalystneuro/neuroconv/pull/1012) * Added `get_json_schema_from_method_signature` which constructs Pydantic models automatically from the signature of any function with typical annotation types used throughout NeuroConv. [PR #1016](https://github.com/catalystneuro/neuroconv/pull/1016) * Replaced all interface annotations with Pydantic types. [PR #1017](https://github.com/catalystneuro/neuroconv/pull/1017) +* Changed typehint collections (e.g. `List`) to standard collections (e.g. `list`). [PR #1021](https://github.com/catalystneuro/neuroconv/pull/1021) diff --git a/src/neuroconv/basedatainterface.py b/src/neuroconv/basedatainterface.py index 0c9b9d813..4e0a0aac4 100644 --- a/src/neuroconv/basedatainterface.py +++ b/src/neuroconv/basedatainterface.py @@ -3,7 +3,7 @@ import uuid from abc import ABC, abstractmethod from pathlib import Path -from typing import Literal, Optional, Tuple, Union +from typing import Literal, Optional, Union from jsonschema.validators import validate from pydantic import FilePath @@ -30,8 +30,8 @@ class BaseDataInterface(ABC): """Abstract class defining the structure of all DataInterfaces.""" display_name: Union[str, None] = None - keywords: Tuple[str] = tuple() - associated_suffixes: Tuple[str] = tuple() + keywords: tuple[str] = tuple() + associated_suffixes: tuple[str] = tuple() info: Union[str, None] = None @classmethod diff --git a/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py b/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py index 532ad5b42..d61cdf18b 100644 --- a/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py +++ b/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import List, Literal, Optional +from typing import Literal, Optional import numpy as np import scipy @@ -28,7 +28,7 @@ class AudioInterface(BaseTemporalAlignmentInterface): associated_suffixes = (".wav",) info = "Interface for writing audio recordings to an NWB file." - def __init__(self, file_paths: List[FilePath], verbose: bool = False): + def __init__(self, file_paths: list[FilePath], verbose: bool = False): """ Data interface for writing acoustic recordings to an NWB file. @@ -105,7 +105,7 @@ def get_original_timestamps(self) -> np.ndarray: def get_timestamps(self) -> Optional[np.ndarray]: raise NotImplementedError("The AudioInterface does not yet support timestamps.") - def set_aligned_timestamps(self, aligned_timestamps: List[np.ndarray]): + def set_aligned_timestamps(self, aligned_timestamps: list[np.ndarray]): raise NotImplementedError("The AudioInterface does not yet support timestamps.") def set_aligned_starting_time(self, aligned_starting_time: float): @@ -132,7 +132,7 @@ def set_aligned_starting_time(self, aligned_starting_time: float): "Please set them using 'set_aligned_segment_starting_times'." ) - def set_aligned_segment_starting_times(self, aligned_segment_starting_times: List[float]): + def set_aligned_segment_starting_times(self, aligned_segment_starting_times: list[float]): """ Align the individual starting time for each audio file in this interface relative to the common session start time. diff --git a/src/neuroconv/datainterfaces/behavior/deeplabcut/_dlc_utils.py b/src/neuroconv/datainterfaces/behavior/deeplabcut/_dlc_utils.py index 39245c307..7608fffaa 100644 --- a/src/neuroconv/datainterfaces/behavior/deeplabcut/_dlc_utils.py +++ b/src/neuroconv/datainterfaces/behavior/deeplabcut/_dlc_utils.py @@ -2,7 +2,7 @@ import pickle import warnings from pathlib import Path -from typing import List, Optional, Union +from typing import Optional, Union import numpy as np import pandas as pd @@ -305,7 +305,7 @@ def add_subject_to_nwbfile( h5file: FilePath, individual_name: str, config_file: FilePath, - timestamps: Optional[Union[List, np.ndarray]] = None, + timestamps: Optional[Union[list, np.ndarray]] = None, pose_estimation_container_kwargs: Optional[dict] = None, ) -> NWBFile: """ diff --git a/src/neuroconv/datainterfaces/behavior/deeplabcut/deeplabcutdatainterface.py b/src/neuroconv/datainterfaces/behavior/deeplabcut/deeplabcutdatainterface.py index f35f3854c..c6850b555 100644 --- a/src/neuroconv/datainterfaces/behavior/deeplabcut/deeplabcutdatainterface.py +++ b/src/neuroconv/datainterfaces/behavior/deeplabcut/deeplabcutdatainterface.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Optional, Union +from typing import Optional, Union import numpy as np from pydantic import FilePath @@ -76,7 +76,7 @@ def get_timestamps(self) -> np.ndarray: "Unable to retrieve timestamps for this interface! Define the `get_timestamps` method for this interface." ) - def set_aligned_timestamps(self, aligned_timestamps: Union[List, np.ndarray]): + def set_aligned_timestamps(self, aligned_timestamps: Union[list, np.ndarray]): """ Set aligned timestamps vector for DLC data with user defined timestamps diff --git a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py index 20c080a6e..114cff0dd 100644 --- a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py +++ b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py @@ -1,5 +1,5 @@ from copy import deepcopy -from typing import List, Optional +from typing import Optional from pydantic import FilePath from pynwb import NWBFile @@ -106,8 +106,8 @@ def add_to_nwbfile( reference_frame: Optional[str] = None, confidence_definition: Optional[str] = None, external_mode: bool = True, - starting_frames_original_videos: Optional[List[int]] = None, - starting_frames_labeled_videos: Optional[List[int]] = None, + starting_frames_original_videos: Optional[list[int]] = None, + starting_frames_labeled_videos: Optional[list[int]] = None, stub_test: bool = False, ): original_video_interface = self.data_interface_objects["OriginalVideo"] diff --git a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py index b9761b2c6..b87366109 100644 --- a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py +++ b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py @@ -2,7 +2,7 @@ from copy import deepcopy from datetime import datetime from pathlib import Path -from typing import Optional, Tuple +from typing import Optional import numpy as np from pydantic import FilePath @@ -116,7 +116,7 @@ def _load_source_data(self): pose_estimation_data = pd.read_csv(self.file_path, header=[0, 1, 2]) return pose_estimation_data - def _get_original_video_shape(self) -> Tuple[int, int]: + def _get_original_video_shape(self) -> tuple[int, int]: with self._vc(file_path=str(self.original_video_file_path)) as video: video_shape = video.get_frame_shape() # image size of the original video is in height x width diff --git a/src/neuroconv/datainterfaces/behavior/neuralynx/nvt_utils.py b/src/neuroconv/datainterfaces/behavior/neuralynx/nvt_utils.py index 99a22e577..8d6f4268f 100644 --- a/src/neuroconv/datainterfaces/behavior/neuralynx/nvt_utils.py +++ b/src/neuroconv/datainterfaces/behavior/neuralynx/nvt_utils.py @@ -5,7 +5,7 @@ import os from datetime import datetime from shutil import copy -from typing import Dict, List, Union +from typing import Union import numpy as np from pydantic import FilePath @@ -26,7 +26,7 @@ ] -def read_header(filename: str) -> Dict[str, Union[str, datetime, float, int, List[int]]]: +def read_header(filename: str) -> dict[str, Union[str, datetime, float, int, list[int]]]: """ Parses a Neuralynx NVT File Header and returns it as a dictionary. @@ -83,7 +83,7 @@ def parse_bool(x): return out -def read_data(filename: str) -> Dict[str, np.ndarray]: +def read_data(filename: str) -> dict[str, np.ndarray]: """ Reads a NeuroLynx NVT file and returns its data. @@ -97,7 +97,7 @@ def read_data(filename: str) -> Dict[str, np.ndarray]: Returns ------- - Dict[str, np.ndarray] + dict[str, np.ndarray] Dictionary containing the parsed data. Raises diff --git a/src/neuroconv/datainterfaces/behavior/video/video_utils.py b/src/neuroconv/datainterfaces/behavior/video/video_utils.py index 78c66472a..a8f2412aa 100644 --- a/src/neuroconv/datainterfaces/behavior/video/video_utils.py +++ b/src/neuroconv/datainterfaces/behavior/video/video_utils.py @@ -222,7 +222,7 @@ def _get_frame_details(self): min_frame_size_mb = (math.prod(frame_shape) * self._get_dtype().itemsize) / 1e6 return min_frame_size_mb, frame_shape - def _get_data(self, selection: Tuple[slice]) -> np.ndarray: + def _get_data(self, selection: tuple[slice]) -> np.ndarray: start_frame = selection[0].start end_frame = selection[0].stop frames = np.empty(shape=[end_frame - start_frame, *self._maxshape[1:]]) diff --git a/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py b/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py index ca651e597..dfb22deba 100644 --- a/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py +++ b/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py @@ -1,7 +1,7 @@ import warnings from copy import deepcopy from pathlib import Path -from typing import List, Literal, Optional +from typing import Literal, Optional import numpy as np import psutil @@ -30,7 +30,7 @@ class VideoInterface(BaseDataInterface): def __init__( self, - file_paths: List[FilePath], + file_paths: list[FilePath], verbose: bool = False, *, metadata_key_name: str = "Videos", @@ -104,7 +104,7 @@ def get_metadata(self): return metadata - def get_original_timestamps(self, stub_test: bool = False) -> List[np.ndarray]: + def get_original_timestamps(self, stub_test: bool = False) -> list[np.ndarray]: """ Retrieve the original unaltered timestamps for the data in this interface. @@ -159,7 +159,7 @@ def get_timing_type(self) -> Literal["starting_time and rate", "timestamps"]: "Please specify the temporal alignment of each video." ) - def get_timestamps(self, stub_test: bool = False) -> List[np.ndarray]: + def get_timestamps(self, stub_test: bool = False) -> list[np.ndarray]: """ Retrieve the timestamps for the data in this interface. @@ -176,7 +176,7 @@ def get_timestamps(self, stub_test: bool = False) -> List[np.ndarray]: """ return self._timestamps or self.get_original_timestamps(stub_test=stub_test) - def set_aligned_timestamps(self, aligned_timestamps: List[np.ndarray]): + def set_aligned_timestamps(self, aligned_timestamps: list[np.ndarray]): """ Replace all timestamps for this interface with those aligned to the common session start time. @@ -221,7 +221,7 @@ def set_aligned_starting_time(self, aligned_starting_time: float, stub_test: boo else: raise ValueError("There are no timestamps or starting times set to shift by a common value!") - def set_aligned_segment_starting_times(self, aligned_segment_starting_times: List[float], stub_test: bool = False): + def set_aligned_segment_starting_times(self, aligned_segment_starting_times: list[float], stub_test: bool = False): """ Align the individual starting time for each video (segment) in this interface relative to the common session start time. @@ -264,7 +264,7 @@ def add_to_nwbfile( metadata: Optional[dict] = None, stub_test: bool = False, external_mode: bool = True, - starting_frames: Optional[List[int]] = None, + starting_frames: Optional[list[int]] = None, chunk_data: bool = True, module_name: Optional[str] = None, module_description: Optional[str] = None, diff --git a/src/neuroconv/datainterfaces/ecephys/baserecordingextractorinterface.py b/src/neuroconv/datainterfaces/ecephys/baserecordingextractorinterface.py index b21210d77..354d78f80 100644 --- a/src/neuroconv/datainterfaces/ecephys/baserecordingextractorinterface.py +++ b/src/neuroconv/datainterfaces/ecephys/baserecordingextractorinterface.py @@ -1,4 +1,4 @@ -from typing import List, Literal, Optional, Union +from typing import Literal, Optional, Union import numpy as np from pynwb import NWBFile @@ -106,7 +106,7 @@ def get_metadata(self) -> DeepDict: return metadata - def get_original_timestamps(self) -> Union[np.ndarray, List[np.ndarray]]: + def get_original_timestamps(self) -> Union[np.ndarray, list[np.ndarray]]: """ Retrieve the original unaltered timestamps for the data in this interface. @@ -128,7 +128,7 @@ def get_original_timestamps(self) -> Union[np.ndarray, List[np.ndarray]]: for segment_index in range(self._number_of_segments) ] - def get_timestamps(self) -> Union[np.ndarray, List[np.ndarray]]: + def get_timestamps(self) -> Union[np.ndarray, list[np.ndarray]]: """ Retrieve the timestamps for the data in this interface. @@ -152,7 +152,7 @@ def set_aligned_timestamps(self, aligned_timestamps: np.ndarray): self.recording_extractor.set_times(times=aligned_timestamps) - def set_aligned_segment_timestamps(self, aligned_segment_timestamps: List[np.ndarray]): + def set_aligned_segment_timestamps(self, aligned_segment_timestamps: list[np.ndarray]): """ Replace all timestamps for all segments in this interface with those aligned to the common session start time. @@ -185,7 +185,7 @@ def set_aligned_starting_time(self, aligned_starting_time: float): ] ) - def set_aligned_segment_starting_times(self, aligned_segment_starting_times: List[float]): + def set_aligned_segment_starting_times(self, aligned_segment_starting_times: list[float]): """ Align the starting time for each segment in this interface relative to the common session start time. diff --git a/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py b/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py index 9c3c06849..b3cd25d24 100644 --- a/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py +++ b/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py @@ -1,5 +1,5 @@ from copy import deepcopy -from typing import List, Literal, Optional, Union +from typing import Literal, Optional, Union import numpy as np from pynwb import NWBFile @@ -83,7 +83,7 @@ def get_original_timestamps(self) -> np.ndarray: "Unable to fetch original timestamps for a SortingInterface since it relies upon an attached recording." ) - def get_timestamps(self) -> Union[np.ndarray, List[np.ndarray]]: + def get_timestamps(self) -> Union[np.ndarray, list[np.ndarray]]: if not self.sorting_extractor.has_recording(): raise NotImplementedError( "In order to align timestamps for a SortingInterface, it must have a recording " @@ -138,7 +138,7 @@ def set_aligned_timestamps(self, aligned_timestamps: np.ndarray): times=aligned_timestamps[segment_index], segment_index=segment_index ) - def set_aligned_segment_timestamps(self, aligned_segment_timestamps: List[np.ndarray]): + def set_aligned_segment_timestamps(self, aligned_segment_timestamps: list[np.ndarray]): """ Replace all timestamps for all segments in this interface with those aligned to the common session start time. @@ -182,7 +182,7 @@ def set_aligned_starting_time(self, aligned_starting_time: float): else: sorting_segment._t_start += aligned_starting_time - def set_aligned_segment_starting_times(self, aligned_segment_starting_times: List[float]): + def set_aligned_segment_starting_times(self, aligned_segment_starting_times: list[float]): """ Align the starting time for each segment in this interface relative to the common session start time. diff --git a/src/neuroconv/datainterfaces/ecephys/neuralynx/neuralynxdatainterface.py b/src/neuroconv/datainterfaces/ecephys/neuralynx/neuralynxdatainterface.py index b999944e1..da0573249 100644 --- a/src/neuroconv/datainterfaces/ecephys/neuralynx/neuralynxdatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/neuralynx/neuralynxdatainterface.py @@ -1,5 +1,5 @@ import json -from typing import List, Optional +from typing import Optional import numpy as np from pydantic import DirectoryPath @@ -18,7 +18,7 @@ class NeuralynxRecordingInterface(BaseRecordingExtractorInterface): info = "Interface for Neuralynx recording data." @classmethod - def get_stream_names(cls, folder_path: DirectoryPath) -> List[str]: + def get_stream_names(cls, folder_path: DirectoryPath) -> list[str]: from spikeinterface.extractors import NeuralynxRecordingExtractor stream_names, _ = NeuralynxRecordingExtractor.get_streams(folder_path=folder_path) @@ -158,16 +158,16 @@ def extract_neo_header_metadata(neo_reader) -> dict: return common_header -def _dict_intersection(dict_list: List) -> dict: +def _dict_intersection(dict_list: list[dict]) -> dict: """ Intersect dict_list and return only common keys and values Parameters ---------- - dict_list: list of dicitionaries each representing a header + dict_list: list of dictionaries each representing a header Returns ------- dict: - Dictionary containing key-value pairs common to all input dicitionary_list + Dictionary containing key-value pairs common to all input dictionary_list """ # Collect keys appearing in all dictionaries diff --git a/src/neuroconv/datainterfaces/ecephys/neuroscope/neuroscopedatainterface.py b/src/neuroconv/datainterfaces/ecephys/neuroscope/neuroscopedatainterface.py index d3f9fdfb4..d68532a94 100644 --- a/src/neuroconv/datainterfaces/ecephys/neuroscope/neuroscopedatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/neuroscope/neuroscopedatainterface.py @@ -269,7 +269,7 @@ def __init__( self, folder_path: DirectoryPath, keep_mua_units: bool = True, - exclude_shanks: Optional[list] = None, + exclude_shanks: Optional[list[int]] = None, xml_file_path: Optional[FilePath] = None, verbose: bool = True, ): @@ -282,7 +282,7 @@ def __init__( Path to folder containing .clu and .res files. keep_mua_units : bool, default: True Optional. Whether to return sorted spikes from multi-unit activity. - exclude_shanks : list, optional + exclude_shanks : list of integers, optional List of indices to ignore. The set of all possible indices is chosen by default, extracted as the final integer of all the .res.%i and .clu.%i pairs. xml_file_path : FilePathType, optional diff --git a/src/neuroconv/datainterfaces/ecephys/openephys/openephysbinarydatainterface.py b/src/neuroconv/datainterfaces/ecephys/openephys/openephysbinarydatainterface.py index ccbd3cbbd..371b96f94 100644 --- a/src/neuroconv/datainterfaces/ecephys/openephys/openephysbinarydatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/openephys/openephysbinarydatainterface.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import Optional from pydantic import DirectoryPath @@ -20,7 +20,7 @@ class OpenEphysBinaryRecordingInterface(BaseRecordingExtractorInterface): ExtractorName = "OpenEphysBinaryRecordingExtractor" @classmethod - def get_stream_names(cls, folder_path: DirectoryPath) -> List[str]: + def get_stream_names(cls, folder_path: DirectoryPath) -> list[str]: from spikeinterface.extractors import OpenEphysBinaryRecordingExtractor stream_names, _ = OpenEphysBinaryRecordingExtractor.get_streams(folder_path=folder_path) diff --git a/src/neuroconv/datainterfaces/ecephys/openephys/openephyslegacydatainterface.py b/src/neuroconv/datainterfaces/ecephys/openephys/openephyslegacydatainterface.py index 3403705b1..0818257a7 100644 --- a/src/neuroconv/datainterfaces/ecephys/openephys/openephyslegacydatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/openephys/openephyslegacydatainterface.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import List, Optional +from typing import Optional from warnings import warn from pydantic import DirectoryPath @@ -19,7 +19,7 @@ class OpenEphysLegacyRecordingInterface(BaseRecordingExtractorInterface): info = "Interface for converting legacy OpenEphys recording data." @classmethod - def get_stream_names(cls, folder_path: DirectoryPath) -> List[str]: + def get_stream_names(cls, folder_path: DirectoryPath) -> list[str]: from spikeinterface.extractors import OpenEphysLegacyRecordingExtractor stream_names, _ = OpenEphysLegacyRecordingExtractor.get_streams(folder_path=folder_path) diff --git a/src/neuroconv/datainterfaces/ecephys/phy/phydatainterface.py b/src/neuroconv/datainterfaces/ecephys/phy/phydatainterface.py index e5d250a6e..da5ebbfc2 100644 --- a/src/neuroconv/datainterfaces/ecephys/phy/phydatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/phy/phydatainterface.py @@ -27,7 +27,7 @@ def get_source_schema(cls) -> dict: def __init__( self, folder_path: DirectoryPath, - exclude_cluster_groups: Optional[list] = None, + exclude_cluster_groups: Optional[list[str]] = None, verbose: bool = True, ): """ diff --git a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py index 35a4d8881..9d40cde3d 100644 --- a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py +++ b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Optional +from typing import Optional from pydantic import DirectoryPath @@ -28,7 +28,7 @@ def get_source_schema(cls): return source_schema @classmethod - def get_streams(cls, folder_path: DirectoryPath) -> List[str]: + def get_streams(cls, folder_path: DirectoryPath) -> list[str]: from spikeinterface.extractors import SpikeGLXRecordingExtractor return SpikeGLXRecordingExtractor.get_streams(folder_path=folder_path)[0] @@ -36,7 +36,7 @@ def get_streams(cls, folder_path: DirectoryPath) -> List[str]: def __init__( self, folder_path: DirectoryPath, - streams: Optional[List[str]] = None, + streams: Optional[list[str]] = None, verbose: bool = False, ): """ diff --git a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py index 7cff7f11f..3c0b886ec 100644 --- a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py +++ b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py @@ -1,5 +1,4 @@ from pathlib import Path -from typing import List import numpy as np from pydantic import FilePath @@ -93,7 +92,7 @@ def get_metadata(self) -> dict: ] = "Raw acquisition traces from the NIDQ (.nidq.bin) channels." return metadata - def get_channel_names(self) -> List[str]: + def get_channel_names(self) -> list[str]: """Return a list of channel names as set in the recording extractor.""" return list(self.recording_extractor.get_channel_ids()) diff --git a/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py b/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py index 10ec3e8a3..5336b6116 100644 --- a/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py +++ b/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py @@ -1,7 +1,6 @@ import json from datetime import datetime, timedelta from pathlib import Path -from typing import List from warnings import warn from pydantic import FilePath @@ -52,7 +51,7 @@ def get_source_schema(cls) -> dict: return source_schema def __init__( - self, file_paths: List[FilePath], icephys_metadata: dict = None, icephys_metadata_file_path: FilePath = None + self, file_paths: list[FilePath], icephys_metadata: dict = None, icephys_metadata_file_path: FilePath = None ): """ ABF IcephysInterface based on Neo AxonIO. @@ -161,7 +160,7 @@ def set_aligned_starting_time(self, aligned_starting_time: float): reader._t_starts[segment_index] += aligned_starting_time def set_aligned_segment_starting_times( - self, aligned_segment_starting_times: List[List[float]], stub_test: bool = False + self, aligned_segment_starting_times: list[list[float]], stub_test: bool = False ): """ Align the individual starting time for each video in this interface relative to the common session start time. diff --git a/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py b/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py index 341fc4c0d..ff82ba391 100644 --- a/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py +++ b/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py @@ -1,5 +1,4 @@ import importlib.util -from typing import Tuple import numpy as np from pynwb import NWBFile @@ -92,7 +91,7 @@ def add_to_nwbfile( nwbfile: NWBFile, metadata: dict = None, icephys_experiment_type: str = "voltage_clamp", - skip_electrodes: Tuple[int] = (), + skip_electrodes: tuple[int] = (), ): """ Primary function for converting raw (unprocessed) intracellular data to the NWB standard. diff --git a/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py b/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py index d92676219..9742711e1 100644 --- a/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py +++ b/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py @@ -1,4 +1,4 @@ -from typing import List, Literal, Optional +from typing import Literal, Optional from dateutil.parser import parse from pydantic import DirectoryPath @@ -64,7 +64,7 @@ def __init__( self._stream_name = self.imaging_extractor.stream_name.replace("_", "") self._image_size = self.imaging_extractor.get_image_size() - def _determine_position_current(self) -> List[float]: + def _determine_position_current(self) -> list[float]: """ Returns y, x, and z position values. The unit of values is in the microscope reference frame. """ @@ -222,7 +222,7 @@ def __init__( self._stream_name = self.imaging_extractor.stream_name.replace("_", "") self._image_size = self.imaging_extractor.get_image_size() - def _determine_position_current(self) -> List[float]: + def _determine_position_current(self) -> list[float]: """ Returns y, x, and z position values. The unit of values is in the microscope reference frame. """ diff --git a/src/neuroconv/datainterfaces/text/timeintervalsinterface.py b/src/neuroconv/datainterfaces/text/timeintervalsinterface.py index ad55852c5..b00779cb0 100644 --- a/src/neuroconv/datainterfaces/text/timeintervalsinterface.py +++ b/src/neuroconv/datainterfaces/text/timeintervalsinterface.py @@ -1,6 +1,6 @@ from abc import abstractmethod from pathlib import Path -from typing import Dict, Optional +from typing import Optional import numpy as np from pydantic import FilePath @@ -120,8 +120,8 @@ def add_to_nwbfile( nwbfile: NWBFile, metadata: Optional[dict] = None, tag: str = "trials", - column_name_mapping: Dict[str, str] = None, - column_descriptions: Dict[str, str] = None, + column_name_mapping: dict[str, str] = None, + column_descriptions: dict[str, str] = None, ) -> NWBFile: """ Run the NWB conversion for the instantiated data interface. diff --git a/src/neuroconv/nwbconverter.py b/src/neuroconv/nwbconverter.py index 2f5074d6a..ea7b7cd67 100644 --- a/src/neuroconv/nwbconverter.py +++ b/src/neuroconv/nwbconverter.py @@ -4,7 +4,7 @@ import warnings from collections import Counter from pathlib import Path -from typing import Dict, List, Literal, Optional, Tuple, Union +from typing import Literal, Optional, Union from jsonschema import validate from pydantic import FilePath @@ -36,8 +36,8 @@ class NWBConverter: """Primary class for all NWB conversion classes.""" display_name: Union[str, None] = None - keywords: Tuple[str] = tuple() - associated_suffixes: Tuple[str] = tuple() + keywords: tuple[str] = tuple() + associated_suffixes: tuple[str] = tuple() info: Union[str, None] = None data_interface_classes = None @@ -57,11 +57,11 @@ def get_source_schema(cls) -> dict: return source_schema @classmethod - def validate_source(cls, source_data: Dict[str, dict], verbose: bool = True): + def validate_source(cls, source_data: dict[str, dict], verbose: bool = True): """Validate source_data against Converter source_schema.""" cls._validate_source_data(source_data=source_data, verbose=verbose) - def _validate_source_data(self, source_data: Dict[str, dict], verbose: bool = True): + def _validate_source_data(self, source_data: dict[str, dict], verbose: bool = True): encoder = NWBSourceDataEncoder() # The encoder produces a serialized object, so we deserialized it for comparison @@ -73,7 +73,7 @@ def _validate_source_data(self, source_data: Dict[str, dict], verbose: bool = Tr if verbose: print("Source data is valid!") - def __init__(self, source_data: Dict[str, dict], verbose: bool = True): + def __init__(self, source_data: dict[str, dict], verbose: bool = True): """Validate source_data against source_schema and initialize all data interfaces.""" self.verbose = verbose self._validate_source_data(source_data=source_data, verbose=self.verbose) @@ -102,7 +102,7 @@ def get_metadata(self) -> DeepDict: metadata = dict_deep_update(metadata, interface_metadata) return metadata - def validate_metadata(self, metadata: Dict[str, dict], append_mode: bool = False): + def validate_metadata(self, metadata: dict[str, dict], append_mode: bool = False): """Validate metadata against Converter metadata_schema.""" encoder = NWBMetaDataEncoder() # The encoder produces a serialized object, so we deserialized it for comparison @@ -135,7 +135,7 @@ def get_conversion_options_schema(self) -> dict: return conversion_options_schema - def validate_conversion_options(self, conversion_options: Dict[str, dict]): + def validate_conversion_options(self, conversion_options: dict[str, dict]): """Validate conversion_options against Converter conversion_options_schema.""" validate(instance=conversion_options or {}, schema=self.get_conversion_options_schema()) if self.verbose: @@ -289,7 +289,7 @@ def get_source_schema(cls) -> dict: def validate_source(cls): raise NotImplementedError("Source data not available with previously initialized classes.") - def __init__(self, data_interfaces: Union[List[BaseDataInterface], Dict[str, BaseDataInterface]], verbose=True): + def __init__(self, data_interfaces: Union[list[BaseDataInterface], dict[str, BaseDataInterface]], verbose=True): self.verbose = verbose if isinstance(data_interfaces, list): # Create unique names for each interface diff --git a/src/neuroconv/tools/aws/_submit_aws_batch_job.py b/src/neuroconv/tools/aws/_submit_aws_batch_job.py index 9b4dfe81a..0d36bee7f 100644 --- a/src/neuroconv/tools/aws/_submit_aws_batch_job.py +++ b/src/neuroconv/tools/aws/_submit_aws_batch_job.py @@ -4,7 +4,7 @@ import os import time from datetime import datetime -from typing import Dict, List, Optional +from typing import Optional from uuid import uuid4 @@ -12,9 +12,9 @@ def submit_aws_batch_job( *, job_name: str, docker_image: str, - commands: Optional[List[str]] = None, - environment_variables: Optional[Dict[str, str]] = None, - job_dependencies: Optional[List[Dict[str, str]]] = None, + commands: Optional[list[str]] = None, + environment_variables: Optional[dict[str, str]] = None, + job_dependencies: Optional[list[dict[str, str]]] = None, status_tracker_table_name: str = "neuroconv_batch_status_tracker", iam_role_name: str = "neuroconv_batch_role", compute_environment_name: str = "neuroconv_batch_environment", @@ -24,7 +24,7 @@ def submit_aws_batch_job( minimum_worker_cpus: int = 4, submission_id: Optional[str] = None, region: Optional[str] = None, -) -> Dict[str, str]: +) -> dict[str, str]: """ Submit a job to AWS Batch for processing. diff --git a/src/neuroconv/tools/data_transfers/_dandi.py b/src/neuroconv/tools/data_transfers/_dandi.py index f67f43197..11d38b0cb 100644 --- a/src/neuroconv/tools/data_transfers/_dandi.py +++ b/src/neuroconv/tools/data_transfers/_dandi.py @@ -4,7 +4,7 @@ from pathlib import Path from shutil import rmtree from tempfile import mkdtemp -from typing import List, Optional, Union +from typing import Optional, Union from warnings import warn from pydantic import DirectoryPath @@ -20,7 +20,7 @@ def automatic_dandi_upload( cleanup: bool = False, number_of_jobs: Union[int, None] = None, number_of_threads: Union[int, None] = None, -) -> List[Path]: +) -> list[Path]: """ Fully automated upload of NWB files to a Dandiset. diff --git a/src/neuroconv/tools/data_transfers/_globus.py b/src/neuroconv/tools/data_transfers/_globus.py index 3429127f1..62bb654ed 100644 --- a/src/neuroconv/tools/data_transfers/_globus.py +++ b/src/neuroconv/tools/data_transfers/_globus.py @@ -4,7 +4,7 @@ import re from pathlib import Path from time import sleep, time -from typing import Dict, List, Tuple, Union +from typing import Union from pydantic import DirectoryPath from tqdm import tqdm @@ -15,7 +15,7 @@ def get_globus_dataset_content_sizes( globus_endpoint_id: str, path: str, recursive: bool = True, timeout: float = 120.0 -) -> Dict[str, int]: # pragma: no cover +) -> dict[str, int]: # pragma: no cover """ May require external login via 'globus login' from CLI. @@ -35,13 +35,13 @@ def get_globus_dataset_content_sizes( def transfer_globus_content( source_endpoint_id: str, - source_files: Union[str, List[List[str]]], + source_files: Union[str, list[list[str]]], destination_endpoint_id: str, destination_folder: DirectoryPath, display_progress: bool = True, progress_update_rate: float = 60.0, progress_update_timeout: float = 600.0, -) -> Tuple[bool, List[str]]: # pragma: no cover +) -> tuple[bool, list[str]]: # pragma: no cover """ Track progress for transferring content from source_endpoint_id to destination_endpoint_id:destination_folder. @@ -81,10 +81,10 @@ def transfer_globus_content( def _submit_transfer_request( source_endpoint_id: str, - source_files: Union[str, List[List[str]]], + source_files: Union[str, list[list[str]]], destination_endpoint_id: str, destination_folder_path: Path, - ) -> Dict[str, int]: + ) -> dict[str, int]: """Send transfer request to Globus.""" folder_content_sizes = dict() task_total_sizes = dict() @@ -134,7 +134,7 @@ def _submit_transfer_request( return task_total_sizes def _track_transfer( - task_total_sizes: Dict[str, int], + task_total_sizes: dict[str, int], display_progress: bool = True, progress_update_rate: float = 60.0, progress_update_timeout: float = 600.0, diff --git a/src/neuroconv/tools/hdmf.py b/src/neuroconv/tools/hdmf.py index 64db7b66e..e8dfff294 100644 --- a/src/neuroconv/tools/hdmf.py +++ b/src/neuroconv/tools/hdmf.py @@ -2,21 +2,20 @@ import math import warnings -from typing import Tuple import numpy as np from hdmf.data_utils import GenericDataChunkIterator as HDMFGenericDataChunkIterator class GenericDataChunkIterator(HDMFGenericDataChunkIterator): - def _get_default_buffer_shape(self, buffer_gb: float = 1.0) -> Tuple[int]: + def _get_default_buffer_shape(self, buffer_gb: float = 1.0) -> tuple[int]: return self.estimate_default_buffer_shape( buffer_gb=buffer_gb, chunk_shape=self.chunk_shape, maxshape=self.maxshape, dtype=self.dtype ) # TODO: move this to the core iterator in HDMF so it can be easily swapped out as well as run on its own @staticmethod - def estimate_default_chunk_shape(chunk_mb: float, maxshape: Tuple[int, ...], dtype: np.dtype) -> Tuple[int, ...]: + def estimate_default_chunk_shape(chunk_mb: float, maxshape: tuple[int, ...], dtype: np.dtype) -> tuple[int, ...]: """ Select chunk shape with size in MB less than the threshold of chunk_mb. @@ -47,8 +46,8 @@ def estimate_default_chunk_shape(chunk_mb: float, maxshape: Tuple[int, ...], dty # TODO: move this to the core iterator in HDMF so it can be easily swapped out as well as run on its own @staticmethod def estimate_default_buffer_shape( - buffer_gb: float, chunk_shape: Tuple[int, ...], maxshape: Tuple[int, ...], dtype: np.dtype - ) -> Tuple[int, ...]: + buffer_gb: float, chunk_shape: tuple[int, ...], maxshape: tuple[int, ...], dtype: np.dtype + ) -> tuple[int, ...]: # Elevate any overflow warnings to trigger error. # This is usually an indicator of something going terribly wrong with the estimation calculations and should be # avoided at all costs. @@ -149,5 +148,5 @@ def _get_dtype(self) -> np.dtype: def _get_maxshape(self) -> tuple: return self.data.shape - def _get_data(self, selection: Tuple[slice]) -> np.ndarray: + def _get_data(self, selection: tuple[slice]) -> np.ndarray: return self.data[selection] diff --git a/src/neuroconv/tools/importing.py b/src/neuroconv/tools/importing.py index 3b4e67b9d..04a3cbf21 100644 --- a/src/neuroconv/tools/importing.py +++ b/src/neuroconv/tools/importing.py @@ -6,7 +6,7 @@ from importlib.util import find_spec from platform import processor, python_version from types import ModuleType -from typing import Dict, List, Optional, Tuple, Union +from typing import Optional, Union from packaging import version @@ -41,8 +41,8 @@ def is_package_installed(package_name: str) -> bool: def get_package( package_name: str, installation_instructions: Optional[str] = None, - excluded_python_versions: Optional[List[str]] = None, - excluded_platforms_and_python_versions: Optional[Dict[str, Union[List[str], Dict[str, List[str]]]]] = None, + excluded_python_versions: Optional[list[str]] = None, + excluded_platforms_and_python_versions: Optional[dict[str, Union[list[str], dict[str, list[str]]]]] = None, ) -> ModuleType: """ Check if package is installed and return module if so. @@ -128,7 +128,7 @@ def get_package( ) -def get_format_summaries() -> Dict[str, Dict[str, Union[str, Tuple[str, ...], None]]]: +def get_format_summaries() -> dict[str, dict[str, Union[str, tuple[str, ...], None]]]: """Simple helper function for compiling high level summaries of all format interfaces and converters.""" # Local scope import to avoid circularity from ..converters import converter_list diff --git a/src/neuroconv/tools/neo/neo.py b/src/neuroconv/tools/neo/neo.py index 8873359db..220c64de0 100644 --- a/src/neuroconv/tools/neo/neo.py +++ b/src/neuroconv/tools/neo/neo.py @@ -3,7 +3,7 @@ import warnings from copy import deepcopy from pathlib import Path -from typing import Optional, Tuple +from typing import Optional import neo.io.baseio import numpy as np @@ -65,7 +65,7 @@ def get_number_of_segments(neo_reader, block: int = 0) -> int: return neo_reader.header["nb_segment"][block] -def get_command_traces(neo_reader, segment: int = 0, cmd_channel: int = 0) -> Tuple[list, str, str]: +def get_command_traces(neo_reader, segment: int = 0, cmd_channel: int = 0) -> tuple[list, str, str]: """ Get command traces (e.g. voltage clamp command traces). @@ -213,7 +213,7 @@ def add_icephys_recordings( metadata: dict = None, icephys_experiment_type: str = "voltage_clamp", stimulus_type: str = "not described", - skip_electrodes: Tuple[int] = (), + skip_electrodes: tuple[int] = (), compression: Optional[str] = None, # TODO: remove completely after 10/1/2024 ): """ @@ -383,7 +383,7 @@ def add_neo_to_nwb( compression: Optional[str] = None, # TODO: remove completely after 10/1/2024 icephys_experiment_type: str = "voltage_clamp", stimulus_type: Optional[str] = None, - skip_electrodes: Tuple[int] = (), + skip_electrodes: tuple[int] = (), ): """ Auxiliary static method for nwbextractor. diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_backend.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_backend.py index 2c07a1bb0..2b0cc507e 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_backend.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_backend.py @@ -1,6 +1,6 @@ """Base Pydantic models for DatasetInfo and DatasetConfiguration.""" -from typing import Any, ClassVar, Dict, Literal, Type +from typing import Any, ClassVar, Literal, Type from hdmf.container import DataIO from pydantic import BaseModel, ConfigDict, Field @@ -21,7 +21,7 @@ class BackendConfiguration(BaseModel): model_config = ConfigDict(validate_assignment=True) # Re-validate model on mutation - dataset_configurations: Dict[str, DatasetIOConfiguration] = Field( + dataset_configurations: dict[str, DatasetIOConfiguration] = Field( description=( "A mapping from object locations (e.g. `acquisition/TestElectricalSeriesAP/data`) " "to their DatasetConfiguration specification that contains all information " @@ -42,15 +42,15 @@ def __str__(self) -> str: # Pydantic models have several API calls for retrieving the schema - override all of them to work @classmethod - def schema(cls, **kwargs) -> Dict[str, Any]: + def schema(cls, **kwargs) -> dict[str, Any]: return cls.model_json_schema(**kwargs) @classmethod - def schema_json(cls, **kwargs) -> Dict[str, Any]: + def schema_json(cls, **kwargs) -> dict[str, Any]: return cls.model_json_schema(**kwargs) @classmethod - def model_json_schema(cls, **kwargs) -> Dict[str, Any]: + def model_json_schema(cls, **kwargs) -> dict[str, Any]: assert "mode" not in kwargs, "The 'mode' of this method is fixed to be 'validation' and cannot be changed." assert "schema_generator" not in kwargs, "The 'schema_generator' of this method cannot be changed." return super().model_json_schema(mode="validation", schema_generator=PureJSONSchemaGenerator, **kwargs) @@ -65,7 +65,7 @@ def from_nwbfile(cls, nwbfile: NWBFile) -> Self: return cls(dataset_configurations=dataset_configurations) - def find_locations_requiring_remapping(self, nwbfile: NWBFile) -> Dict[str, DatasetIOConfiguration]: + def find_locations_requiring_remapping(self, nwbfile: NWBFile) -> dict[str, DatasetIOConfiguration]: """ Find locations of objects with mismatched IDs in the file. @@ -80,7 +80,7 @@ def find_locations_requiring_remapping(self, nwbfile: NWBFile) -> Dict[str, Data Returns ------- - Dict[str, DatasetIOConfiguration] + dict[str, DatasetIOConfiguration] A dictionary where: * Keys: Locations in the NWB of objects with mismatched IDs. * Values: New `DatasetIOConfiguration` objects corresponding to the updated object IDs. @@ -127,7 +127,7 @@ def find_locations_requiring_remapping(self, nwbfile: NWBFile) -> Dict[str, Data def build_remapped_backend( self, - locations_to_remap: Dict[str, DatasetIOConfiguration], + locations_to_remap: dict[str, DatasetIOConfiguration], ) -> Self: """ Build a remapped backend configuration by updating mismatched object IDs. diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_dataset_io.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_dataset_io.py index 01e291034..8b40e9a9e 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_dataset_io.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_dataset_io.py @@ -2,7 +2,7 @@ import math from abc import ABC, abstractmethod -from typing import Any, Dict, List, Literal, Tuple, Union +from typing import Any, Literal, Union import h5py import numcodecs @@ -56,7 +56,7 @@ def _find_location_in_memory_nwbfile(neurodata_object: Container, field_name: st return _recursively_find_location_in_memory_nwbfile(current_location=field_name, neurodata_object=neurodata_object) -def _infer_dtype_of_list(list_: List[Union[int, float, list]]) -> np.dtype: +def _infer_dtype_of_list(list_: list[Union[int, float, list]]) -> np.dtype: """ Attempt to infer the dtype of values in an arbitrarily sized and nested list. @@ -103,16 +103,16 @@ class DatasetIOConfiguration(BaseModel, ABC): ) dataset_name: Literal["data", "timestamps"] = Field(description="The reference name of the dataset.", frozen=True) dtype: InstanceOf[np.dtype] = Field(description="The data type of elements of this dataset.", frozen=True) - full_shape: Tuple[int, ...] = Field(description="The maximum shape of the entire dataset.", frozen=True) + full_shape: tuple[int, ...] = Field(description="The maximum shape of the entire dataset.", frozen=True) # User specifiable fields - chunk_shape: Union[Tuple[PositiveInt, ...], None] = Field( + chunk_shape: Union[tuple[PositiveInt, ...], None] = Field( description=( "The specified shape to use when chunking the dataset. " "For optimized streaming speeds, a total size of around 10 MB is recommended." ), ) - buffer_shape: Union[Tuple[int, ...], None] = Field( + buffer_shape: Union[tuple[int, ...], None] = Field( description=( "The specified shape to use when iteratively loading data into memory while writing the dataset. " "For optimized writing speeds and minimal RAM usage, a total size of around 1 GB is recommended." @@ -123,12 +123,12 @@ class DatasetIOConfiguration(BaseModel, ABC): ] = Field( description="The specified compression method to apply to this dataset. Set to `None` to disable compression.", ) - compression_options: Union[Dict[str, Any], None] = Field( + compression_options: Union[dict[str, Any], None] = Field( default=None, description="The optional parameters to use for the specified compression method." ) @abstractmethod - def get_data_io_kwargs(self) -> Dict[str, Any]: + def get_data_io_kwargs(self) -> dict[str, Any]: """ Fetch the properly structured dictionary of input arguments. @@ -142,7 +142,7 @@ def __str__(self) -> str: Reason being two-fold; a standard `repr` is intended to be slightly more machine-readable / a more basic representation of the true object state. But then also because an iterable of these objects, such as a - `List[DatasetConfiguration]`, would print out the nested representations, which only look good when using the + `list[DatasetConfiguration]`, would print out the nested representations, which only look good when using the basic `repr` (that is, this fancy string print-out does not look good when nested in another container). """ size_in_bytes = math.prod(self.full_shape) * self.dtype.itemsize @@ -174,7 +174,7 @@ def __str__(self) -> str: return string @model_validator(mode="before") - def validate_all_shapes(cls, values: Dict[str, Any]) -> Dict[str, Any]: + def validate_all_shapes(cls, values: dict[str, Any]) -> dict[str, Any]: location_in_file = values["location_in_file"] dataset_name = values["dataset_name"] @@ -231,15 +231,15 @@ def validate_all_shapes(cls, values: Dict[str, Any]) -> Dict[str, Any]: # Pydantic models have several API calls for retrieving the schema - override all of them to work @classmethod - def schema(cls, **kwargs) -> Dict[str, Any]: + def schema(cls, **kwargs) -> dict[str, Any]: return cls.model_json_schema(**kwargs) @classmethod - def schema_json(cls, **kwargs) -> Dict[str, Any]: + def schema_json(cls, **kwargs) -> dict[str, Any]: return cls.model_json_schema(**kwargs) @classmethod - def model_json_schema(cls, **kwargs) -> Dict[str, Any]: + def model_json_schema(cls, **kwargs) -> dict[str, Any]: assert "mode" not in kwargs, "The 'mode' of this method is fixed to be 'validation' and cannot be changed." assert "schema_generator" not in kwargs, "The 'schema_generator' of this method cannot be changed." return super().model_json_schema(mode="validation", schema_generator=PureJSONSchemaGenerator, **kwargs) diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_backend.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_backend.py index f85d388b7..011b2e26d 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_backend.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_backend.py @@ -1,6 +1,6 @@ """Base Pydantic models for the HDF5DatasetConfiguration.""" -from typing import ClassVar, Dict, Literal, Type +from typing import ClassVar, Literal, Type from pydantic import Field from pynwb import H5DataIO @@ -16,7 +16,7 @@ class HDF5BackendConfiguration(BackendConfiguration): pretty_backend_name: ClassVar[Literal["HDF5"]] = "HDF5" data_io_class: ClassVar[Type[H5DataIO]] = H5DataIO - dataset_configurations: Dict[str, HDF5DatasetIOConfiguration] = Field( + dataset_configurations: dict[str, HDF5DatasetIOConfiguration] = Field( description=( "A mapping from object locations to their HDF5DatasetConfiguration specification that contains all " "information for writing the datasets to disk using the HDF5 backend." diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_dataset_io.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_dataset_io.py index 828a37998..44c7660ab 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_dataset_io.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_dataset_io.py @@ -1,6 +1,6 @@ """Base Pydantic models for the HDF5DatasetConfiguration.""" -from typing import Any, Dict, Literal, Union +from typing import Any, Literal, Union import h5py from pydantic import Field, InstanceOf @@ -45,11 +45,11 @@ class HDF5DatasetIOConfiguration(DatasetIOConfiguration): ) # TODO: actually provide better schematic rendering of options. Only support defaults in GUIDE for now. # Looks like they'll have to be hand-typed however... Can try parsing the google docstrings - no annotation typing. - compression_options: Union[Dict[str, Any], None] = Field( + compression_options: Union[dict[str, Any], None] = Field( default=None, description="The optional parameters to use for the specified compression method." ) - def get_data_io_kwargs(self) -> Dict[str, Any]: + def get_data_io_kwargs(self) -> dict[str, Any]: if is_package_installed(package_name="hdf5plugin"): import hdf5plugin diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_backend.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_backend.py index abd5bdd67..ee26e0553 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_backend.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_backend.py @@ -1,6 +1,6 @@ """Base Pydantic models for the ZarrDatasetConfiguration.""" -from typing import ClassVar, Dict, Literal, Type +from typing import ClassVar, Literal, Type import psutil from hdmf_zarr import ZarrDataIO @@ -17,7 +17,7 @@ class ZarrBackendConfiguration(BackendConfiguration): pretty_backend_name: ClassVar[Literal["Zarr"]] = "Zarr" data_io_class: ClassVar[Type[ZarrDataIO]] = ZarrDataIO - dataset_configurations: Dict[str, ZarrDatasetIOConfiguration] = Field( + dataset_configurations: dict[str, ZarrDatasetIOConfiguration] = Field( description=( "A mapping from object locations to their ZarrDatasetConfiguration specification that contains all " "information for writing the datasets to disk using the Zarr backend." diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_dataset_io.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_dataset_io.py index c070a20e9..48b7c070b 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_dataset_io.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_dataset_io.py @@ -1,6 +1,6 @@ """Base Pydantic models for the ZarrDatasetConfiguration.""" -from typing import Any, Dict, List, Literal, Union +from typing import Any, Literal, Union import numcodecs import zarr @@ -58,11 +58,11 @@ class ZarrDatasetIOConfiguration(DatasetIOConfiguration): ) # TODO: actually provide better schematic rendering of options. Only support defaults in GUIDE for now. # Looks like they'll have to be hand-typed however... Can try parsing the numpy docstrings - no annotation typing. - compression_options: Union[Dict[str, Any], None] = Field( + compression_options: Union[dict[str, Any], None] = Field( default=None, description="The optional parameters to use for the specified compression method." ) filter_methods: Union[ - List[Union[Literal[tuple(AVAILABLE_ZARR_COMPRESSION_METHODS.keys())], InstanceOf[numcodecs.abc.Codec]]], None + list[Union[Literal[tuple(AVAILABLE_ZARR_COMPRESSION_METHODS.keys())], InstanceOf[numcodecs.abc.Codec]]], None ] = Field( default=None, description=( @@ -72,7 +72,7 @@ class ZarrDatasetIOConfiguration(DatasetIOConfiguration): "Set to `None` to disable filtering." ), ) - filter_options: Union[List[Dict[str, Any]], None] = Field( + filter_options: Union[list[dict[str, Any]], None] = Field( default=None, description="The optional parameters to use for each specified filter method." ) @@ -88,7 +88,7 @@ def __str__(self) -> str: # Inherited docstring from parent. noqa: D105 return string @model_validator(mode="before") - def validate_filter_methods_and_options_length_match(cls, values: Dict[str, Any]): + def validate_filter_methods_and_options_length_match(cls, values: dict[str, Any]): filter_methods = values.get("filter_methods", None) filter_options = values.get("filter_options", None) @@ -110,7 +110,7 @@ def validate_filter_methods_and_options_length_match(cls, values: Dict[str, Any] return values - def get_data_io_kwargs(self) -> Dict[str, Any]: + def get_data_io_kwargs(self) -> dict[str, Any]: filters = None if self.filter_methods: filters = list() diff --git a/src/neuroconv/tools/optogenetics.py b/src/neuroconv/tools/optogenetics.py index c4249f3ee..f8ed27015 100644 --- a/src/neuroconv/tools/optogenetics.py +++ b/src/neuroconv/tools/optogenetics.py @@ -1,5 +1,3 @@ -from typing import Tuple - import numpy as np @@ -10,7 +8,7 @@ def create_optogenetic_stimulation_timeseries( frequency: float, pulse_width: float, power: float, -) -> Tuple[np.ndarray, np.ndarray]: +) -> tuple[np.ndarray, np.ndarray]: """Create a continuous stimulation time series from stimulation onset times and parameters. In the resulting data array, the offset time of each pulse is represented by a 0 power value. diff --git a/src/neuroconv/tools/path_expansion.py b/src/neuroconv/tools/path_expansion.py index b2dd367f4..427a33a9e 100644 --- a/src/neuroconv/tools/path_expansion.py +++ b/src/neuroconv/tools/path_expansion.py @@ -4,7 +4,7 @@ import os from datetime import date, datetime from pathlib import Path -from typing import Dict, Iterable, List +from typing import Iterable from parse import parse from pydantic import DirectoryPath, FilePath @@ -34,7 +34,7 @@ def extract_metadata(self, base_directory: DirectoryPath, format_: str): Yields ------ - Tuple[Path, Dict[str, Any]] + tuple[Path, dict[str, Any]] A tuple containing the file path as a `Path` object and a dictionary of the named metadata extracted from the file path. """ @@ -67,7 +67,7 @@ def list_directory(self, base_directory: DirectoryPath) -> Iterable[FilePath]: """ pass - def expand_paths(self, source_data_spec: Dict[str, dict]) -> List[DeepDict]: + def expand_paths(self, source_data_spec: dict[str, dict]) -> list[DeepDict]: """ Match paths in a directory to specs and extract metadata from the paths. diff --git a/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py b/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py index 98001dd8e..0792e2caf 100644 --- a/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py +++ b/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py @@ -1,7 +1,7 @@ """General purpose iterator for all ImagingExtractor data.""" import math -from typing import Optional, Tuple +from typing import Optional import numpy as np from hdmf.data_utils import GenericDataChunkIterator @@ -138,7 +138,7 @@ def _get_maxshape(self) -> tuple: video_shape += (depth,) return video_shape - def _get_data(self, selection: Tuple[slice]) -> np.ndarray: + def _get_data(self, selection: tuple[slice]) -> np.ndarray: data = self.imaging_extractor.get_video( start_frame=selection[0].start, end_frame=selection[0].stop, diff --git a/src/neuroconv/tools/spikeinterface/spikeinterface.py b/src/neuroconv/tools/spikeinterface/spikeinterface.py index c3ad28813..262e1eaa8 100644 --- a/src/neuroconv/tools/spikeinterface/spikeinterface.py +++ b/src/neuroconv/tools/spikeinterface/spikeinterface.py @@ -1,7 +1,7 @@ import uuid import warnings from collections import defaultdict -from typing import Any, List, Literal, Optional, Union +from typing import Any, Literal, Optional, Union import numpy as np import psutil @@ -261,7 +261,7 @@ def _get_group_name(recording: BaseRecording) -> np.ndarray: return group_names -def _get_electrodes_table_global_ids(nwbfile: pynwb.NWBFile) -> List[str]: +def _get_electrodes_table_global_ids(nwbfile: pynwb.NWBFile) -> list[str]: """ Generate a list of global identifiers for channels in the electrode table of an NWB file. @@ -274,7 +274,7 @@ def _get_electrodes_table_global_ids(nwbfile: pynwb.NWBFile) -> List[str]: Returns ------- - List[str] + list[str] A list of unique keys, each representing a combination of channel name and group name from the electrodes table. If the electrodes table or the necessary columns are not present, an empty list is returned. @@ -293,7 +293,7 @@ def _get_electrodes_table_global_ids(nwbfile: pynwb.NWBFile) -> List[str]: return unique_keys -def _get_electrode_table_indices_for_recording(recording: BaseRecording, nwbfile: pynwb.NWBFile) -> List[int]: +def _get_electrode_table_indices_for_recording(recording: BaseRecording, nwbfile: pynwb.NWBFile) -> list[int]: """ Get the indices of the electrodes in the NWBFile that correspond to the channels in the recording. @@ -311,7 +311,7 @@ def _get_electrode_table_indices_for_recording(recording: BaseRecording, nwbfile Returns ------- - List[int] + list[int] A list of indices corresponding to the positions in the NWBFile's electrodes table that match the channels in the recording. """ @@ -1316,9 +1316,9 @@ def write_recording_to_nwbfile( def add_units_table( sorting: BaseSorting, nwbfile: pynwb.NWBFile, - unit_ids: Optional[List[Union[str, int]]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, units_table_name: str = "units", unit_table_description: str = "Autogenerated by neuroconv.", write_in_processing_module: bool = False, @@ -1355,9 +1355,9 @@ def add_units_table( def add_units_table_to_nwbfile( sorting: BaseSorting, nwbfile: pynwb.NWBFile, - unit_ids: Optional[List[Union[str, int]]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, units_table_name: str = "units", unit_table_description: Optional[str] = None, write_in_processing_module: bool = False, @@ -1614,10 +1614,10 @@ def add_units_table_to_nwbfile( def add_sorting( sorting: BaseSorting, nwbfile: Optional[pynwb.NWBFile] = None, - unit_ids: Optional[Union[List[str], List[int]]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, - skip_features: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, + skip_features: Optional[list[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", @@ -1653,10 +1653,10 @@ def add_sorting( def add_sorting_to_nwbfile( sorting: BaseSorting, nwbfile: Optional[pynwb.NWBFile] = None, - unit_ids: Optional[Union[List[str], List[int]]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, - skip_features: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, + skip_features: Optional[list[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", @@ -1735,10 +1735,10 @@ def write_sorting( metadata: Optional[dict] = None, overwrite: bool = False, verbose: bool = True, - unit_ids: Optional[List[Union[str, int]]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, - skip_features: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, + skip_features: Optional[list[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", @@ -1782,10 +1782,10 @@ def write_sorting_to_nwbfile( metadata: Optional[dict] = None, overwrite: bool = False, verbose: bool = True, - unit_ids: Optional[List[Union[str, int]]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, - skip_features: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, + skip_features: Optional[list[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", @@ -1868,8 +1868,8 @@ def add_sorting_analyzer( nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, recording: Optional[BaseRecording] = None, - unit_ids: Optional[Union[List[str], List[int]]] = None, - skip_properties: Optional[List[str]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", @@ -1903,8 +1903,8 @@ def add_sorting_analyzer_to_nwbfile( nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, recording: Optional[BaseRecording] = None, - unit_ids: Optional[Union[List[str], List[int]]] = None, - skip_properties: Optional[List[str]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", @@ -2017,10 +2017,10 @@ def write_sorting_analyzer( overwrite: bool = False, recording: Optional[BaseRecording] = None, verbose: bool = True, - unit_ids: Optional[Union[List[str], List[int]]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, write_electrical_series: bool = False, add_electrical_series_kwargs: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", @@ -2062,10 +2062,10 @@ def write_sorting_analyzer_to_nwbfile( overwrite: bool = False, recording: Optional[BaseRecording] = None, verbose: bool = True, - unit_ids: Optional[Union[List[str], List[int]]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, write_electrical_series: bool = False, add_electrical_series_kwargs: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", @@ -2169,10 +2169,10 @@ def write_waveforms( overwrite: bool = False, recording: Optional[BaseRecording] = None, verbose: bool = True, - unit_ids: Optional[List[Union[str, int]]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, write_electrical_series: bool = False, add_electrical_series_kwargs: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", @@ -2196,8 +2196,8 @@ def add_waveforms( nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, recording: Optional[BaseRecording] = None, - unit_ids: Optional[List[Union[str, int]]] = None, - skip_properties: Optional[List[str]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", diff --git a/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py b/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py index e6909a2e5..95b14fc23 100644 --- a/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py +++ b/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py @@ -1,4 +1,4 @@ -from typing import Iterable, Optional, Tuple +from typing import Iterable, Optional from hdmf.data_utils import GenericDataChunkIterator from spikeinterface import BaseRecording @@ -77,7 +77,7 @@ def __init__( progress_bar_options=progress_bar_options, ) - def _get_default_chunk_shape(self, chunk_mb: float = 10.0) -> Tuple[int, int]: + def _get_default_chunk_shape(self, chunk_mb: float = 10.0) -> tuple[int, int]: assert chunk_mb > 0, f"chunk_mb ({chunk_mb}) must be greater than zero!" chunk_channels = min( @@ -91,7 +91,7 @@ def _get_default_chunk_shape(self, chunk_mb: float = 10.0) -> Tuple[int, int]: return (chunk_frames, chunk_channels) - def _get_data(self, selection: Tuple[slice]) -> Iterable: + def _get_data(self, selection: tuple[slice]) -> Iterable: return self.recording.get_traces( segment_index=self.segment_index, channel_ids=self.channel_ids[selection[1]], diff --git a/src/neuroconv/tools/testing/_mock/_mock_dataset_models.py b/src/neuroconv/tools/testing/_mock/_mock_dataset_models.py index 77901f220..6f69f9e93 100644 --- a/src/neuroconv/tools/testing/_mock/_mock_dataset_models.py +++ b/src/neuroconv/tools/testing/_mock/_mock_dataset_models.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Iterable, Literal, Tuple, Union +from typing import Any, Iterable, Literal, Union import h5py import numcodecs @@ -18,14 +18,14 @@ def mock_HDF5DatasetIOConfiguration( object_id: str = "481a0860-3a0c-40ec-b931-df4a3e9b101f", location_in_file: str = "acquisition/TestElectricalSeries/data", dataset_name: Literal["data", "timestamps"] = "data", - full_shape: Tuple[int, ...] = (60 * 30_000, 384), # ~1 minute of v1 NeuroPixels probe + full_shape: tuple[int, ...] = (60 * 30_000, 384), # ~1 minute of v1 NeuroPixels probe dtype: np.dtype = np.dtype("int16"), - chunk_shape: Tuple[int, ...] = (78_125, 64), # ~10 MB - buffer_shape: Tuple[int, ...] = (1_250_000, 384), # ~1 GB + chunk_shape: tuple[int, ...] = (78_125, 64), # ~10 MB + buffer_shape: tuple[int, ...] = (1_250_000, 384), # ~1 GB compression_method: Union[ Literal[tuple(AVAILABLE_HDF5_COMPRESSION_METHODS.keys())], h5py._hl.filters.FilterRefBase, None ] = "gzip", - compression_options: Union[Dict[str, Any], None] = None, + compression_options: Union[dict[str, Any], None] = None, ) -> HDF5DatasetIOConfiguration: """Mock object of a HDF5DatasetIOConfiguration with NeuroPixel-like values to show chunk/buffer recommendations.""" return HDF5DatasetIOConfiguration( @@ -45,18 +45,18 @@ def mock_ZarrDatasetIOConfiguration( object_id: str = "481a0860-3a0c-40ec-b931-df4a3e9b101f", location_in_file: str = "acquisition/TestElectricalSeries/data", dataset_name: Literal["data", "timestamps"] = "data", - full_shape: Tuple[int, ...] = (60 * 30_000, 384), # ~1 minute of v1 NeuroPixels probe + full_shape: tuple[int, ...] = (60 * 30_000, 384), # ~1 minute of v1 NeuroPixels probe dtype: np.dtype = np.dtype("int16"), - chunk_shape: Tuple[int, ...] = (78_125, 64), # ~10 MB - buffer_shape: Tuple[int, ...] = (1_250_000, 384), # ~1 GB + chunk_shape: tuple[int, ...] = (78_125, 64), # ~10 MB + buffer_shape: tuple[int, ...] = (1_250_000, 384), # ~1 GB compression_method: Union[ Literal[tuple(AVAILABLE_ZARR_COMPRESSION_METHODS.keys())], numcodecs.abc.Codec, None ] = "gzip", - compression_options: Union[Dict[str, Any]] = None, + compression_options: Union[dict[str, Any]] = None, filter_methods: Iterable[ Union[Literal[tuple(AVAILABLE_ZARR_COMPRESSION_METHODS.keys())], numcodecs.abc.Codec, None] ] = None, - filter_options: Union[Iterable[Dict[str, Any]], None] = None, + filter_options: Union[Iterable[dict[str, Any]], None] = None, ) -> ZarrDatasetIOConfiguration: """Mock object of a ZarrDatasetIOConfiguration with NeuroPixel-like values to show chunk/buffer recommendations.""" return ZarrDatasetIOConfiguration( @@ -76,7 +76,7 @@ def mock_ZarrDatasetIOConfiguration( def mock_HDF5BackendConfiguration() -> HDF5BackendConfiguration: """Mock instance of a HDF5BackendConfiguration with two NeuroPixel-like datasets.""" - dataset_configurations: Dict[str, HDF5DatasetIOConfiguration] = { + dataset_configurations: dict[str, HDF5DatasetIOConfiguration] = { "acquisition/TestElectricalSeriesAP/data": mock_HDF5DatasetIOConfiguration( location_in_file="acquisition/TestElectricalSeriesAP/data", dataset_name="data" ), @@ -97,7 +97,7 @@ def mock_HDF5BackendConfiguration() -> HDF5BackendConfiguration: def mock_ZarrBackendConfiguration() -> ZarrBackendConfiguration: """Mock instance of a HDF5BackendConfiguration with several NeuroPixel-like datasets.""" - dataset_configurations: Dict[str, ZarrDatasetIOConfiguration] = { + dataset_configurations: dict[str, ZarrDatasetIOConfiguration] = { "acquisition/TestElectricalSeriesAP/data": mock_ZarrDatasetIOConfiguration( location_in_file="acquisition/TestElectricalSeriesAP/data", dataset_name="data", diff --git a/src/neuroconv/tools/testing/data_interface_mixins.py b/src/neuroconv/tools/testing/data_interface_mixins.py index 578f3688c..b923851c2 100644 --- a/src/neuroconv/tools/testing/data_interface_mixins.py +++ b/src/neuroconv/tools/testing/data_interface_mixins.py @@ -5,7 +5,7 @@ from copy import deepcopy from datetime import datetime from pathlib import Path -from typing import List, Literal, Optional, Type, Union +from typing import Literal, Optional, Type, Union import numpy as np from hdmf.testing import TestCase as HDMFTestCase @@ -63,7 +63,7 @@ class DataInterfaceTestMixin: """ data_interface_cls: Type[BaseDataInterface] - interface_kwargs: Union[dict, List[dict]] + interface_kwargs: Union[dict, list[dict]] save_directory: Path = Path(tempfile.mkdtemp()) conversion_options: dict = dict() maxDiff = None @@ -260,7 +260,7 @@ class TemporalAlignmentMixin: """ data_interface_cls: Type[BaseDataInterface] - interface_kwargs: Union[dict, List[dict]] + interface_kwargs: Union[dict, list[dict]] maxDiff = None def setUpFreshInterface(self): diff --git a/src/neuroconv/tools/testing/mock_interfaces.py b/src/neuroconv/tools/testing/mock_interfaces.py index 902e805f4..6f91e775f 100644 --- a/src/neuroconv/tools/testing/mock_interfaces.py +++ b/src/neuroconv/tools/testing/mock_interfaces.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import List, Literal, Optional +from typing import Literal, Optional import numpy as np from pynwb import NWBFile @@ -65,7 +65,7 @@ def get_source_schema(cls) -> dict: return source_schema def __init__( - self, signal_duration: float = 7.0, ttl_times: Optional[List[List[float]]] = None, ttl_duration: float = 1.0 + self, signal_duration: float = 7.0, ttl_times: Optional[list[list[float]]] = None, ttl_duration: float = 1.0 ): """ Define a mock SpikeGLXNIDQInterface by overriding the recording extractor to be a mock TTL signal. @@ -128,7 +128,7 @@ def __init__( self, num_channels: int = 4, sampling_frequency: float = 30_000.0, - # durations: Tuple[float] = (1.0,), # Uncomment when pydantic is integrated for schema validation + # durations: tuple[float] = (1.0,), # Uncomment when pydantic is integrated for schema validation durations: tuple = (1.0,), seed: int = 0, verbose: bool = True, diff --git a/src/neuroconv/tools/testing/mock_probes.py b/src/neuroconv/tools/testing/mock_probes.py index f5a3cea88..8b41d0f9c 100644 --- a/src/neuroconv/tools/testing/mock_probes.py +++ b/src/neuroconv/tools/testing/mock_probes.py @@ -1,5 +1,3 @@ -from typing import List - import numpy as np @@ -7,7 +5,7 @@ def generate_mock_probe(num_channels: int, num_shanks: int = 3): import probeinterface as pi # The shank ids will be 0, 0, 0, ..., 1, 1, 1, ..., 2, 2, 2, ... - shank_ids: List[int] = [] + shank_ids: list[int] = [] positions = np.zeros((num_channels, 2)) # ceil division channels_per_shank = (num_channels + num_shanks - 1) // num_shanks diff --git a/src/neuroconv/tools/text.py b/src/neuroconv/tools/text.py index 8c5b84410..b47ff2215 100644 --- a/src/neuroconv/tools/text.py +++ b/src/neuroconv/tools/text.py @@ -1,5 +1,3 @@ -from typing import Dict - import numpy as np import pandas as pd from pynwb.epoch import TimeIntervals @@ -9,8 +7,8 @@ def convert_df_to_time_intervals( df: pd.DataFrame, table_name: str = "trials", table_description: str = "experimental trials", - column_name_mapping: Dict[str, str] = None, - column_descriptions: Dict[str, str] = None, + column_name_mapping: dict[str, str] = None, + column_descriptions: dict[str, str] = None, ) -> TimeIntervals: """ Convert a dataframe to a TimeIntervals object. diff --git a/src/neuroconv/utils/json_schema.py b/src/neuroconv/utils/json_schema.py index e31a71c58..73aa97bdf 100644 --- a/src/neuroconv/utils/json_schema.py +++ b/src/neuroconv/utils/json_schema.py @@ -4,7 +4,7 @@ import warnings from datetime import datetime from pathlib import Path -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Optional import docstring_parser import hdmf.data_utils @@ -48,8 +48,8 @@ def get_base_schema( tag: Optional[str] = None, root: bool = False, id_: Optional[str] = None, - required: Optional[List] = None, - properties: Optional[Dict] = None, + required: Optional[list[str]] = None, + properties: Optional[dict] = None, **kwargs, ) -> dict: """Return the base schema used for all other schemas.""" @@ -69,7 +69,7 @@ def get_base_schema( return base_schema -def get_schema_from_method_signature(method: Callable, exclude: Optional[List[str]] = None) -> dict: +def get_schema_from_method_signature(method: Callable, exclude: Optional[list[str]] = None) -> dict: """Deprecated version of `get_json_schema_from_method_signature`.""" message = ( "The method `get_schema_from_method_signature` is now named `get_json_schema_from_method_signature`." @@ -80,7 +80,7 @@ def get_schema_from_method_signature(method: Callable, exclude: Optional[List[st return get_json_schema_from_method_signature(method=method, exclude=exclude) -def get_json_schema_from_method_signature(method: Callable, exclude: Optional[List[str]] = None) -> dict: +def get_json_schema_from_method_signature(method: Callable, exclude: Optional[list[str]] = None) -> dict: """ Get the equivalent JSON schema for a signature of a method. @@ -326,7 +326,7 @@ def get_metadata_schema_for_icephys(): return schema -def validate_metadata(metadata: Dict[str, dict], schema: Dict[str, dict], verbose: bool = False): +def validate_metadata(metadata: dict[str, dict], schema: dict[str, dict], verbose: bool = False): """Validate metadata against a schema.""" encoder = NWBMetaDataEncoder() # The encoder produces a serialized object, so we deserialized it for comparison