diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 13c28f363..a90515312 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,3 +22,9 @@ repos: - id: codespell additional_dependencies: - tomli + +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.11.2 + hooks: + - id: mypy + additional_dependencies: ["types-PyYAML", "types-setuptools"] diff --git a/CHANGELOG.md b/CHANGELOG.md index 7193363b6..4cc861849 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ * Simplified the `nwbinspector.testing` configuration framework. [#509](https://github.com/NeurodataWithoutBorders/nwbinspector/pull/509) * Cleaned old references to non-recent PyNWB and HDMF versions. Current policy is that latest NWB Inspector releases should only support compatibility with latest PyNWB and HDMF. [#510](https://github.com/NeurodataWithoutBorders/nwbinspector/pull/510) * Swapped setup approach to the modern `pyproject.toml` standard. [#507](https://github.com/NeurodataWithoutBorders/nwbinspector/pull/507) - +* Added complete annotation typing and integrated Mypy into pre-commit. [#520](https://github.com/NeurodataWithoutBorders/nwbinspector/pull/520) diff --git a/docs/conf.py b/docs/conf.py index 998c66e8d..a551bb704 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -8,7 +8,7 @@ sys.path.append(str(Path(__file__).parent)) from conf_extlinks import extlinks, intersphinx_mapping -sys.path.insert(0, Path(__file__).resolve().parents[1]) +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) project = "NWBInspector" copyright = "2022-2024, CatalystNeuro" diff --git a/pyproject.toml b/pyproject.toml index 3839ef739..41ed89967 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,7 +92,17 @@ extend-exclude = ''' exclude = ["docs/*"] [tool.ruff.lint] -select = ["F401", "F541", "I"] # TODO: eventually, expand to other 'D', and other 'F' linting +# TODO: eventually, expand to other 'D', and other 'F' linting +select = [ + "F401", + "F541", + "I", + "ANN001", + "ANN201", + "ANN202", + "ANN205", + "ANN206", +] fixable = ["ALL"] [tool.ruff.lint.per-file-ignores] @@ -105,6 +115,7 @@ fixable = ["ALL"] "src/nwbinspector/inspector_tools/__init__.py" = ["F401", "I"] "src/nwbinspector/version/__init__.py" = ["F401", "I"] "src/nwbinspector/register_checks/__init__.py" = ["F401", "I"] +"tests/*" = ["ANN"] [tool.ruff.lint.isort] relative-imports-order = "closest-to-furthest" diff --git a/src/nwbinspector/_configuration.py b/src/nwbinspector/_configuration.py index c5d3995e6..efa62f82b 100644 --- a/src/nwbinspector/_configuration.py +++ b/src/nwbinspector/_configuration.py @@ -3,22 +3,20 @@ import json from pathlib import Path from types import FunctionType -from typing import Optional +from typing import Optional, Union import jsonschema import yaml +from typing_extensions import Callable -from nwbinspector.utils import PathType +from ._registration import Importance, available_checks -from . import available_checks -from ._registration import Importance - -INTERNAL_CONFIGS = dict( +INTERNAL_CONFIGS: dict[str, Path] = dict( dandi=Path(__file__).parent / "_internal_configs" / "dandi.inspector_config.yaml", ) -def validate_config(config: dict): +def validate_config(config: dict) -> None: """Validate an instance of configuration against the official schema.""" config_schema_file_path = Path(__file__).parent / "_internal_configs" / "config.schema.json" with open(file=config_schema_file_path, mode="r") as fp: @@ -26,7 +24,7 @@ def validate_config(config: dict): jsonschema.validate(instance=config, schema=schema) -def _copy_function(function): +def _copy_function(function: Callable) -> Callable: """Copy the core parts of a given function, excluding wrappers, then return a new function.""" copied_function = FunctionType( function.__code__, function.__globals__, function.__name__, function.__defaults__, function.__closure__ @@ -34,10 +32,11 @@ def _copy_function(function): # in case f was given attrs (note this dict is a shallow copy) copied_function.__dict__.update(function.__dict__) + return copied_function -def copy_check(check): +def copy_check(check: Callable) -> Callable: """ Copy a check function so that internal attributes can be adjusted without changing the original function. @@ -47,11 +46,12 @@ def copy_check(check): see https://github.com/NeurodataWithoutBorders/nwbinspector/pull/218 for explanation. """ copied_check = _copy_function(function=check) - copied_check.__wrapped__ = _copy_function(function=check.__wrapped__) + copied_check.__wrapped__ = _copy_function(function=check.__wrapped__) # type: ignore + return copied_check -def load_config(filepath_or_keyword: PathType) -> dict: +def load_config(filepath_or_keyword: Union[str, Path]) -> dict: """ Load a config dictionary either via keyword search of the internal configs, or an explicit filepath. @@ -59,14 +59,15 @@ def load_config(filepath_or_keyword: PathType) -> dict: - 'dandi' For all DANDI archive related practices, including validation and upload. """ - file = INTERNAL_CONFIGS.get(filepath_or_keyword, filepath_or_keyword) + file = INTERNAL_CONFIGS.get(str(filepath_or_keyword), filepath_or_keyword) with open(file=file, mode="r") as stream: config = yaml.safe_load(stream=stream) + return config def configure_checks( - checks: list = available_checks, + checks: Optional[list] = None, config: Optional[dict] = None, ignore: Optional[list[str]] = None, select: Optional[list[str]] = None, @@ -77,12 +78,12 @@ def configure_checks( Parameters ---------- + checks : list of check functions, optional + If None, defaults to all registered checks. config : dict Dictionary valid against our JSON configuration schema. Can specify a mapping of importance levels and list of check functions whose importance you wish to change. Typically loaded via json.load from a valid .json file - checks : list of check functions - Defaults to all registered checks. ignore: list, optional Names of functions to skip. select: list, optional @@ -100,6 +101,8 @@ def configure_checks( The default is the lowest level, BEST_PRACTICE_SUGGESTION. """ + checks = checks or available_checks + if ignore is not None and select is not None: raise ValueError("Options 'ignore' and 'select' cannot both be used.") if importance_threshold not in Importance: @@ -107,9 +110,10 @@ def configure_checks( f"Indicated importance_threshold ({importance_threshold}) is not a valid importance level! Please choose " "from [CRITICAL_IMPORTANCE, BEST_PRACTICE_VIOLATION, BEST_PRACTICE_SUGGESTION]." ) + + checks_out: list = [] if config is not None: validate_config(config=config) - checks_out = [] ignore = ignore or [] for check in checks: mapped_check = copy_check(check=check) @@ -118,7 +122,7 @@ def configure_checks( if importance_name == "SKIP": ignore.append(check.__name__) continue - mapped_check.importance = Importance[importance_name] + mapped_check.importance = Importance[importance_name] # type: ignore # Output wrappers are apparently parsed at time of wrapping not of time of output return... # Attempting to re-wrap the copied function if the importance level is being adjusted... # From https://github.com/NeurodataWithoutBorders/nwbinspector/issues/302 @@ -133,5 +137,6 @@ def configure_checks( elif ignore: checks_out = [x for x in checks_out if x.__name__ not in ignore] if importance_threshold: - checks_out = [x for x in checks_out if x.importance.value >= importance_threshold.value] + checks_out = [x for x in checks_out if x.importance.value >= importance_threshold.value] # type: ignore + return checks_out diff --git a/src/nwbinspector/_dandi_inspection.py b/src/nwbinspector/_dandi_inspection.py index 1b3a04167..084724ec5 100644 --- a/src/nwbinspector/_dandi_inspection.py +++ b/src/nwbinspector/_dandi_inspection.py @@ -21,7 +21,7 @@ def inspect_dandiset( importance_threshold: Union[str, Importance] = Importance.BEST_PRACTICE_SUGGESTION, skip_validate: bool = False, show_progress_bar: bool = True, - client: Union["dandi.dandiapi.DandiAPIClient", None] = None, + client: Union["dandi.dandiapi.DandiAPIClient", None] = None, # type: ignore ) -> Iterable[Union[InspectorMessage, None]]: """ Inspect a Dandiset for common issues. @@ -73,12 +73,6 @@ def inspect_dandiset( dandiset = client.get_dandiset(dandiset_id=dandiset_id, version_id=dandiset_version) - # if not any( - # asset_type.get("identifier", "") != "RRID:SCR_015242" # Identifier for NWB standard - # for asset_type in dandiset.get_raw_metadata().get("assetsSummary", {}).get("dataStandard", []) - # ): - # return None - nwb_assets = [asset for asset in dandiset.get_assets() if ".nwb" in pathlib.Path(asset.path).suffixes] nwb_assets_iterator = nwb_assets @@ -101,7 +95,7 @@ def inspect_dandiset( importance_threshold=importance_threshold, skip_validate=skip_validate, ): - message.file_path = asset.path + message.file_path = asset.path # type: ignore yield message @@ -110,13 +104,13 @@ def inspect_dandi_file_path( dandi_file_path: str, dandiset_id: str, dandiset_version: Union[str, Literal["draft"], None] = None, - config: Union[str, pathlib.Path, dict, Literal["dandi"]] = "dandi", + config: Union[str, pathlib.Path, dict, Literal["dandi"], None] = "dandi", checks: Union[list, None] = None, ignore: Union[list[str], None] = None, select: Union[list[str], None] = None, importance_threshold: Union[str, Importance] = Importance.BEST_PRACTICE_SUGGESTION, skip_validate: bool = False, - client: Union["dandi.dandiapi.DandiAPIClient", None] = None, + client: Union["dandi.dandiapi.DandiAPIClient", None] = None, # type: ignore ) -> Iterable[Union[InspectorMessage, None]]: """ Inspect a Dandifile for common issues. @@ -131,7 +125,7 @@ def inspect_dandi_file_path( The specific published version of the Dandiset to inspect. If None, the latest version is used. If there are no published versions, then 'draft' is used instead. - config : file path, dictionary, or "dandi", default: "dandi" + config : file path, dictionary, "dandi", or None, default: "dandi" If a file path, loads the dictionary configuration from the file. If a dictionary, it must be valid against the configuration schema. If "dandi", uses the requirements for DANDI validation. @@ -177,14 +171,14 @@ def inspect_dandi_file_path( importance_threshold=importance_threshold, skip_validate=skip_validate, ): - message.file_path = dandi_file_path + message.file_path = dandi_file_path # type: ignore yield message def inspect_url( *, url: str, - config: Union[str, pathlib.Path, dict, Literal["dandi"]] = "dandi", + config: Union[str, pathlib.Path, dict, Literal["dandi"], None] = "dandi", checks: Union[list, None] = None, ignore: Union[list[str], None] = None, select: Union[list[str], None] = None, @@ -202,7 +196,7 @@ def inspect_url( https://dandiarchive.s3.amazonaws.com/blobs/636/57e/63657e32-ad33-4625-b664-31699b5bf664 Note: this must be the `https` URL, not the 's3://' form. - config : file path, dictionary, or "dandi", default: "dandi" + config : file path, dictionary, "dandi", or None, default: "dandi" If a file path, loads the dictionary configuration from the file. If a dictionary, it must be valid against the configuration schema. If "dandi", uses the requirements for DANDI validation. @@ -232,9 +226,10 @@ def inspect_url( filterwarnings(action="ignore", message="No cached namespaces found in .*") filterwarnings(action="ignore", message="Ignoring cached namespace .*") - if not isinstance(config, dict): + if isinstance(config, (str, pathlib.Path)): config = load_config(filepath_or_keyword=config) - validate_config(config=config) + if isinstance(config, dict): + validate_config(config=config) byte_stream = remfile.File(url=url) with ( @@ -249,7 +244,7 @@ def inspect_url( importance=Importance.PYNWB_VALIDATION, check_function_name=validation_error.name, location=validation_error.location, - file_path=nwbfile_path, + file_path=url, ) nwbfile = io.read() @@ -262,5 +257,5 @@ def inspect_url( select=select, importance_threshold=importance_threshold, ): - message.file_path = url + message.file_path = url # type: ignore yield message diff --git a/src/nwbinspector/_formatting.py b/src/nwbinspector/_formatting.py index 22c4455e9..baf147018 100644 --- a/src/nwbinspector/_formatting.py +++ b/src/nwbinspector/_formatting.py @@ -8,19 +8,20 @@ from enum import Enum from pathlib import Path from platform import platform -from typing import Optional, Union +from typing import Any, Optional, Union import numpy as np +from packaging.version import Version from ._organization import organize_messages from ._types import Importance, InspectorMessage -from .utils import FilePathType, get_package_version +from .utils import get_package_version class InspectorOutputJSONEncoder(json.JSONEncoder): """Custom JSONEncoder for the NWBInspector.""" - def default(self, o): # noqa D102 + def default(self, o: object) -> Any: # noqa D102 if isinstance(o, InspectorMessage): return o.__dict__ if isinstance(o, Enum): @@ -31,7 +32,7 @@ def default(self, o): # noqa D102 return super().default(o) -def _get_report_header(): +def _get_report_header() -> dict[str, str]: """Grab basic information from system at time of report generation.""" return dict( Timestamp=str(datetime.now().astimezone()), @@ -44,8 +45,8 @@ class FormatterOptions: """Class structure for defining all free attributes for the design of a report format.""" def __init__( - self, indent_size: int = 2, indent: Optional[str] = None, section_headers: list[str] = ["=", "-", "~"] - ): + self, indent_size: int = 2, indent: Optional[str] = None, section_headers: tuple[str, ...] = ("=", "-", "~") + ) -> None: """ Class that defines all the format parameters used by the generic MessageFormatter. @@ -58,11 +59,11 @@ def __init__( Defines the specific indentation to inject between numerical sectioning and section name or message. Overrides indent_size. Defaults to " " * indent_size. - section_headers : list of strings, optional + section_headers : tuple of strings List of characters that will be injected under the display of each new section of the report. If levels is longer than this list, the last item will be repeated over the remaining levels. If levels is shorter than this list, only the first len(levels) of items will be used. - Defaults to the .rst style for three sub-sections: ["=", "-", "~"] + Defaults to the .rst style for three subsections: ["=", "-", "~"] """ # TODO # Future custom options could include section break sizes, section-specific indents, etc. @@ -75,14 +76,14 @@ class MessageFormatter: def __init__( self, - messages: list[InspectorMessage], + messages: list[Optional[InspectorMessage]], levels: list[str], reverse: Optional[list[bool]] = None, detailed: bool = False, formatter_options: Optional[FormatterOptions] = None, - ): + ) -> None: self.nmessages = len(messages) - self.nfiles = len(set(message.file_path for message in messages)) + self.nfiles = len(set(message.file_path for message in messages)) # type: ignore self.message_count_by_importance = self._count_messages_by_importance(messages=messages) self.initial_organized_messages = organize_messages(messages=messages, levels=levels, reverse=reverse) self.detailed = detailed @@ -100,29 +101,29 @@ def __init__( formatter_options, FormatterOptions ), "'formatter_options' is not an instance of FormatterOptions!" self.formatter_options = formatter_options - self.formatter_options.section_headers.extend( - [self.formatter_options.section_headers[-1]] * (self.nlevels - len(self.formatter_options.section_headers)) - ) + self.formatter_options.section_headers = self.formatter_options.section_headers + ( + self.formatter_options.section_headers[-1], + ) * (self.nlevels - len(self.formatter_options.section_headers)) self.message_counter = 0 - self.formatted_messages = [] + self.formatted_messages: list = [] @staticmethod - def _count_messages_by_importance(messages: list[InspectorMessage]) -> dict[str, int]: + def _count_messages_by_importance(messages: list[Optional[InspectorMessage]]) -> dict[str, int]: message_count_by_importance = {importance_level.name: 0 for importance_level in Importance} for message in messages: - message_count_by_importance[message.importance.name] += 1 + message_count_by_importance[message.importance.name] += 1 # type: ignore for key in [keys for keys, count in message_count_by_importance.items() if count == 0]: message_count_by_importance.pop(key) return message_count_by_importance @staticmethod - def _get_name(obj) -> str: + def _get_name(obj: Union[Enum, str]) -> str: if isinstance(obj, Enum): return obj.name if isinstance(obj, str): return obj - def _get_message_header(self, message: InspectorMessage): + def _get_message_header(self, message: InspectorMessage) -> str: message_header = "" if "file_path" in self.free_levels: message_header += f"{message.file_path} - " @@ -138,17 +139,17 @@ def _get_message_header(self, message: InspectorMessage): message_header += f"with name '{message.object_name}'" return message_header - def _get_message_increment(self, level_counter: list[int]): + def _get_message_increment(self, level_counter: list[int]) -> str: return ( f"{'.'.join(np.array(level_counter, dtype=str))}.{self.message_counter}" f"{self.formatter_options.indent}" ) def _add_subsection( self, - organized_messages: dict[str, Union[dict, list[InspectorMessage]]], + organized_messages: dict, levels: list[str], level_counter: list[int], - ): + ) -> None: """Recursive helper for display_messages.""" this_level_counter = list(level_counter) # local copy passed from previous recursion level if len(levels) > 1: @@ -217,31 +218,38 @@ def format_messages(self) -> list[str]: def format_messages( - messages: list[InspectorMessage], - levels: list[str] = None, + messages: list[Optional[InspectorMessage]], + levels: Optional[list[str]] = None, reverse: Optional[list[bool]] = None, detailed: bool = False, ) -> list[str]: """Print InspectorMessages in order specified by the organization structure.""" - if levels is None: - levels = ["file_path", "importance"] + levels = levels or ["file_path", "importance"] + message_formatter = MessageFormatter(messages=messages, levels=levels, reverse=reverse, detailed=detailed) formatted_messages = message_formatter.format_messages() + return formatted_messages -def print_to_console(formatted_messages: list[str]): +def print_to_console(formatted_messages: list[str]) -> None: """Print report file contents to console.""" sys.stdout.write(os.linesep * 2) for line in formatted_messages: sys.stdout.write(line + "\n") + return None + -def save_report(report_file_path: FilePathType, formatted_messages: list[str], overwrite=False): +def save_report(report_file_path: Union[str, Path], formatted_messages: list[str], overwrite: bool = False) -> None: """Write the list of organized check results to a nicely formatted text file.""" report_file_path = Path(report_file_path) + if report_file_path.exists() and not overwrite: raise FileExistsError(f"The file {report_file_path} already exists! Set 'overwrite=True' or pass '-o' flag.") + with open(file=report_file_path, mode="w", newline="\n") as file: for line in formatted_messages: file.write(line + "\n") + + return None diff --git a/src/nwbinspector/_nwb_inspection.py b/src/nwbinspector/_nwb_inspection.py index 55406998e..0c7b95669 100644 --- a/src/nwbinspector/_nwb_inspection.py +++ b/src/nwbinspector/_nwb_inspection.py @@ -12,12 +12,10 @@ from natsort import natsorted from tqdm import tqdm -from . import available_checks from ._configuration import configure_checks -from ._registration import Importance, InspectorMessage -from .tools._read_nwbfile import read_nwbfile +from ._registration import Importance, InspectorMessage, available_checks +from .tools._read_nwbfile import read_nwbfile, read_nwbfile_and_io from .utils import ( - FilePathType, OptionalListOfStrings, PathType, calculate_number_of_cpu, @@ -38,7 +36,7 @@ def inspect_all( stream: bool = False, # TODO: remove after 3/1/2025 version_id: Optional[str] = None, # TODO: remove after 3/1/2025 modules: OptionalListOfStrings = None, -): +) -> Iterable[Union[InspectorMessage, None]]: """ Inspect a local NWBFile or folder of NWBFiles and return suggestions for improvements according to best practices. @@ -98,22 +96,22 @@ def inspect_all( # TODO: remove these blocks after 3/1/2025 if version_id is not None: - message = ( + deprecation_message = ( "The `version_id` argument is deprecated and will be removed after 3/1/2025. " "Please call `nwbinspector.inspect_dandiset` with the argument `dandiset_version` instead." ) - warn(message=message, category=DeprecationWarning, stacklevel=2) + warn(message=deprecation_message, category=DeprecationWarning, stacklevel=2) if stream: from ._dandi_inspection import inspect_dandiset - message = ( + warning_message = ( "The `stream` argument is deprecated and will be removed after 3/1/2025. " "Please call `nwbinspector.inspect_dandiset` instead." ) - warn(message=message, category=DeprecationWarning, stacklevel=2) + warn(message=warning_message, category=DeprecationWarning, stacklevel=2) for message in inspect_dandiset( - dandiset_id=path, + dandiset_id=str(path), dandiset_version=version_id, config=config, ignore=ignore, @@ -124,7 +122,7 @@ def inspect_all( return None - n_jobs = calculate_number_of_cpu(requested_cpu=n_jobs) + calculated_number_of_jobs = calculate_number_of_cpu(requested_cpu=n_jobs) if progress_bar_options is None: progress_bar_options = dict(position=0, leave=False) @@ -151,7 +149,7 @@ def inspect_all( message=traceback.format_exc(), importance=Importance.ERROR, check_function_name=f"During io.read() - {type(exception)}: {str(exception)}", - file_path=nwbfile_path, + file_path=str(nwbfile_path), ) if len(identifiers) != len(nwbfiles): @@ -175,54 +173,57 @@ def inspect_all( nwbfiles_iterable = nwbfiles if progress_bar: nwbfiles_iterable = progress_bar_class(nwbfiles_iterable, **progress_bar_options) - if n_jobs != 1: + if calculated_number_of_jobs == 1: + for nwbfile_path in nwbfiles_iterable: # type: ignore + for message in inspect_nwbfile(nwbfile_path=nwbfile_path, checks=checks, skip_validate=skip_validate): + yield message + else: progress_bar_options.update(total=len(nwbfiles)) futures = [] - n_jobs = None if n_jobs == -1 else n_jobs # concurrents uses None instead of -1 for 'auto' mode - with ProcessPoolExecutor(max_workers=n_jobs) as executor: + # concurrents uses None instead of -1 for 'auto' mode + max_workers = None if calculated_number_of_jobs == -1 else calculated_number_of_jobs + with ProcessPoolExecutor(max_workers=max_workers) as executor: for nwbfile_path in nwbfiles: futures.append( executor.submit( _pickle_inspect_nwb, - nwbfile_path=nwbfile_path, + nwbfile_path=str(nwbfile_path), checks=checks, skip_validate=skip_validate, ) ) - nwbfiles_iterable = as_completed(futures) + async_nwbfiles_iterable = as_completed(futures) if progress_bar: - nwbfiles_iterable = progress_bar_class(nwbfiles_iterable, **progress_bar_options) - for future in nwbfiles_iterable: + async_nwbfiles_iterable = progress_bar_class(async_nwbfiles_iterable, **progress_bar_options) + for future in async_nwbfiles_iterable: for message in future.result(): if stream: message.file_path = nwbfiles[message.file_path] yield message - else: - for nwbfile_path in nwbfiles_iterable: - for message in inspect_nwbfile(nwbfile_path=nwbfile_path, checks=checks, skip_validate=skip_validate): - yield message def _pickle_inspect_nwb( nwbfile_path: str, - checks: list = available_checks, + checks: Optional[list] = None, skip_validate: bool = False, -): +) -> Iterable[Union[InspectorMessage, None]]: """Auxiliary function for inspect_all to run in parallel using the ProcessPoolExecutor.""" + checks = checks or available_checks + return list(inspect_nwbfile(nwbfile_path=nwbfile_path, checks=checks, skip_validate=skip_validate)) def inspect_nwbfile( - nwbfile_path: FilePathType, + nwbfile_path: Union[str, Path], driver: Optional[str] = None, # TODO: remove after 3/1/2025 skip_validate: bool = False, max_retries: Optional[int] = None, # TODO: remove after 3/1/2025 - checks: list = available_checks, - config: dict = None, + checks: Optional[list] = None, + config: Optional[dict] = None, ignore: OptionalListOfStrings = None, select: OptionalListOfStrings = None, importance_threshold: Union[str, Importance] = Importance.BEST_PRACTICE_SUGGESTION, -) -> Iterable[InspectorMessage]: +) -> Iterable[Union[InspectorMessage, None]]: """ Open an NWB file, inspect the contents, and return suggestions for improvements according to best practices. @@ -256,6 +257,7 @@ def inspect_nwbfile( The default is the lowest level, BEST_PRACTICE_SUGGESTION. """ + checks = checks or available_checks # TODO: remove error after 3/1/2025 if driver is not None or max_retries is not None: message = ( @@ -269,7 +271,7 @@ def inspect_nwbfile( filterwarnings(action="ignore", message="Ignoring cached namespace .*") try: - in_memory_nwbfile, io = read_nwbfile(nwbfile_path=nwbfile_path, return_io=True) + in_memory_nwbfile, io = read_nwbfile_and_io(nwbfile_path=nwbfile_path) if not skip_validate: validation_errors = pynwb.validate(io=io) @@ -290,7 +292,7 @@ def inspect_nwbfile( select=select, importance_threshold=importance_threshold, ): - inspector_message.file_path = nwbfile_path + inspector_message.file_path = nwbfile_path # type: ignore yield inspector_message except Exception as exception: yield InspectorMessage( @@ -302,13 +304,15 @@ def inspect_nwbfile( # TODO: deprecate once subject types and dandi schemas have been extended -def _intercept_in_vitro_protein(nwbfile_object: pynwb.NWBFile, checks: Optional[list] = None) -> list[callable]: +def _intercept_in_vitro_protein(nwbfile_object: pynwb.NWBFile, checks: Optional[list] = None) -> list: """ If the special 'protein' subject_id is specified, return a truncated list of checks to run. This is a temporary method for allowing upload of certain in vitro data to DANDI and is expected to be replaced in future versions. """ + checks = checks or available_checks + subject_related_check_names = [ "check_subject_exists", "check_subject_id_exists", @@ -319,7 +323,9 @@ def _intercept_in_vitro_protein(nwbfile_object: pynwb.NWBFile, checks: Optional[ "check_subject_proper_age_range", ] subject_related_dandi_requirements = [ - check.importance == Importance.CRITICAL for check in checks if check.__name__ in subject_related_check_names + check.importance == Importance.CRITICAL # type: ignore + for check in checks + if check.__name__ in subject_related_check_names ] subject = getattr(nwbfile_object, "subject", None) @@ -330,6 +336,7 @@ def _intercept_in_vitro_protein(nwbfile_object: pynwb.NWBFile, checks: Optional[ ): non_subject_checks = [check for check in checks if check.__name__ not in subject_related_check_names] return non_subject_checks + return checks @@ -340,7 +347,7 @@ def inspect_nwbfile_object( ignore: Optional[list[str]] = None, select: Optional[list[str]] = None, importance_threshold: Union[str, Importance] = Importance.BEST_PRACTICE_SUGGESTION, -) -> Iterable[InspectorMessage]: +) -> Iterable[Union[InspectorMessage, None]]: """ Inspect an in-memory NWBFile object and return suggestions for improvements according to best practices. @@ -391,7 +398,7 @@ def run_checks( checks: list, progress_bar_class: Optional[Type[tqdm]] = None, progress_bar_options: Optional[dict] = None, -) -> Iterable[InspectorMessage]: +) -> Iterable[Union[InspectorMessage, None]]: """ Run checks on an open NWBFile object. @@ -436,7 +443,7 @@ def run_checks( message=traceback.format_exc(), importance=Importance.ERROR, check_function_name=check_function_name, - file_path=nwbfile_path, + file_path="unknown", ) if isinstance(output, InspectorMessage): # temporary solution to https://github.com/dandi/dandi-cli/issues/1031 diff --git a/src/nwbinspector/_nwbinspector_cli.py b/src/nwbinspector/_nwbinspector_cli.py index 6e9fa5678..db42f5c64 100644 --- a/src/nwbinspector/_nwbinspector_cli.py +++ b/src/nwbinspector/_nwbinspector_cli.py @@ -12,6 +12,7 @@ from ._configuration import load_config from ._dandi_inspection import inspect_dandi_file_path, inspect_dandiset, inspect_url from ._formatting import ( + InspectorOutputJSONEncoder, _get_report_header, format_messages, print_to_console, @@ -89,9 +90,9 @@ def _nwbinspector_cli( path: str, stream: bool = False, version_id: Union[str, None] = None, - report_file_path: str = None, + report_file_path: Union[str, None] = None, config: Union[str, None] = None, - levels: str = None, + levels: Union[str, None] = None, reverse: Union[str, None] = None, overwrite: bool = False, ignore: Union[str, None] = None, @@ -137,16 +138,17 @@ def _nwbinspector_cli( handled_select = select if select is None else select.split(",") handled_importance_threshold = Importance[threshold] show_progress_bar = True if progress_bar is None else strtobool(progress_bar) - modules = [] if modules is None else modules.split(",") + handled_modules = [] if modules is None else modules.split(",") # Trigger the import of custom checks that have been registered and exposed to their respective modules - for module in modules: + for module in handled_modules: importlib.import_module(name=module) # Scan entire Dandiset if stream and ":" not in path: dandiset_id = path dandiset_version = version_id + messages_iterator = inspect_dandiset( dandiset_id=dandiset_id, dandiset_version=dandiset_version, @@ -160,6 +162,8 @@ def _nwbinspector_cli( # Scan a single NWB file in a Dandiset elif stream and ":" in path: dandiset_id, dandi_file_path = path.split(":") + dandiset_version = version_id + messages_iterator = inspect_dandi_file_path( dandi_file_path=dandi_file_path, dandiset_id=dandiset_id, @@ -173,8 +177,9 @@ def _nwbinspector_cli( # Scan single NWB file at URL elif stream and path_is_url: dandi_s3_url = path + messages_iterator = inspect_url( - dandi_s3_url=dandi_s3_url, + url=dandi_s3_url, config=handled_config, ignore=handled_ignore, select=handled_select, diff --git a/src/nwbinspector/_organization.py b/src/nwbinspector/_organization.py index 9caaee5e3..ed2c4d7fb 100644 --- a/src/nwbinspector/_organization.py +++ b/src/nwbinspector/_organization.py @@ -8,7 +8,7 @@ from ._registration import InspectorMessage -def _sort_unique_values(unique_values: list, reverse: bool = False): +def _sort_unique_values(unique_values: list, reverse: bool = False) -> list: """Technically, the 'set' method applies basic sorting to the unique contents, but natsort is more general.""" if any(unique_values) and isinstance(unique_values[0], Enum): return natsorted(unique_values, key=lambda x: -x.value, reverse=reverse) @@ -16,7 +16,9 @@ def _sort_unique_values(unique_values: list, reverse: bool = False): return natsorted(unique_values, reverse=reverse) -def organize_messages(messages: list[InspectorMessage], levels: list[str], reverse: Optional[list[bool]] = None): +def organize_messages( + messages: list[Optional[InspectorMessage]], levels: list[str], reverse: Optional[list[bool]] = None +) -> dict: """ General function for organizing list of InspectorMessages. @@ -25,9 +27,12 @@ def organize_messages(messages: list[InspectorMessage], levels: list[str], rever Parameters ---------- messages : list of InspectorMessages - levels: list of strings + levels : list of strings Each string in this list must correspond onto an attribute of the InspectorMessage class, excluding the 'message' text and 'object_name' (this will be coupled to the 'object_type'). + reverse : list of bool, optional + If provided, this should be a list of booleans that correspond to the 'levels' argument. + If True, the values will be sorted in reverse order. """ assert all([x not in levels for x in ["message", "object_name", "severity"]]), ( "You must specify levels to organize by that correspond to attributes of the InspectorMessage class, excluding " diff --git a/src/nwbinspector/_registration.py b/src/nwbinspector/_registration.py index 5ce7fa515..f43e5caf6 100644 --- a/src/nwbinspector/_registration.py +++ b/src/nwbinspector/_registration.py @@ -1,13 +1,14 @@ """Primary decorator used on a check function to add it to the registry and automatically parse its output.""" from functools import wraps -from typing import Optional +from typing import List, Optional, Union import h5py import zarr from pynwb import NWBFile from pynwb.ecephys import Device, ElectrodeGroup from pynwb.file import Subject +from typing_extensions import Callable from ._types import Importance, InspectorMessage, Severity @@ -16,7 +17,7 @@ # TODO: neurodata_type could have annotation hdmf.utils.ExtenderMeta, which seems to apply to all currently checked # objects. We can wait and see how well that holds up before adding it in officially. -def register_check(importance: Importance, neurodata_type): +def register_check(importance: Importance, neurodata_type: object) -> Callable: """ Wrap a check function with this decorator to add it to the check registry and automatically parse some output. @@ -36,7 +37,7 @@ def register_check(importance: Importance, neurodata_type): If this check is intended to apply to any general NWBFile object, set neurodata_type to None. """ - def register_check_and_auto_parse(check_function): + def register_check_and_auto_parse(check_function: Callable) -> Callable: if importance not in [ Importance.CRITICAL, Importance.BEST_PRACTICE_VIOLATION, @@ -47,23 +48,30 @@ def register_check_and_auto_parse(check_function): "importance level! Please choose one of Importance.CRITICAL, Importance.BEST_PRACTICE_VIOLATION, " "or Importance.BEST_PRACTICE_SUGGESTION." ) - check_function.importance = importance - check_function.neurodata_type = neurodata_type + check_function.importance = importance # type: ignore + check_function.neurodata_type = neurodata_type # type: ignore @wraps(check_function) - def auto_parse_some_output(*args, **kwargs) -> InspectorMessage: + def auto_parse_some_output( + *args, **kwargs + ) -> Union[InspectorMessage, List[Union[InspectorMessage, None]], None]: if args: - obj = args[0] + neurodata_object = args[0] else: - obj = kwargs[list(kwargs)[0]] + neurodata_object = kwargs[list(kwargs)[0]] output = check_function(*args, **kwargs) - auto_parsed_result = None + + auto_parsed_result: Union[InspectorMessage, List[Union[InspectorMessage, None]], None] = None if isinstance(output, InspectorMessage): - auto_parsed_result = _auto_parse(check_function=check_function, obj=obj, result=output) + auto_parsed_result = _auto_parse( + check_function=check_function, neurodata_object=neurodata_object, result=output + ) elif output is not None: auto_parsed_result = list() for result in output: - auto_parsed_result.append(_auto_parse(check_function=check_function, obj=obj, result=result)) + auto_parsed_result.append( + _auto_parse(check_function=check_function, neurodata_object=neurodata_object, result=result) + ) if not any(auto_parsed_result): auto_parsed_result = None return auto_parsed_result @@ -75,7 +83,9 @@ def auto_parse_some_output(*args, **kwargs) -> InspectorMessage: return register_check_and_auto_parse -def _auto_parse(check_function, obj, result: Optional[InspectorMessage] = None): +def _auto_parse( + check_function: Callable, neurodata_object: object, result: Optional[InspectorMessage] = None +) -> Optional[InspectorMessage]: """Automatically fill values in the InspectorMessage from the check function.""" if result is not None: auto_parsed_result = result @@ -85,21 +95,24 @@ def _auto_parse(check_function, obj, result: Optional[InspectorMessage] = None): f"({check_function.__name__}) is not a valid severity level! Please choose one of " "Severity.HIGH, Severity.LOW, or do not specify any severity." ) - auto_parsed_result.importance = check_function.importance + auto_parsed_result.importance = check_function.importance # type: ignore auto_parsed_result.check_function_name = check_function.__name__ - auto_parsed_result.object_type = type(obj).__name__ - auto_parsed_result.object_name = obj.name - auto_parsed_result.location = _parse_location(neurodata_object=obj) + auto_parsed_result.object_type = type(neurodata_object).__name__ + auto_parsed_result.object_name = neurodata_object.name # type: ignore + auto_parsed_result.location = _parse_location(neurodata_object=neurodata_object) + return auto_parsed_result + return None -def _parse_location(neurodata_object) -> Optional[str]: + +def _parse_location(neurodata_object: object) -> Optional[str]: """Grab the object location from a dataset or a container content that is an dataset object.""" known_locations = { NWBFile: "/", Subject: "/general/subject", - Device: f"/general/devices/{neurodata_object.name}", - ElectrodeGroup: f"/general/extracellular_ephys/{neurodata_object.name}", + Device: f"/general/devices/{neurodata_object.name}", # type: ignore + ElectrodeGroup: f"/general/extracellular_ephys/{neurodata_object.name}", # type: ignore # TODO: add ophys equivalents } @@ -108,14 +121,16 @@ def _parse_location(neurodata_object) -> Optional[str]: return val # Infer the human-readable path of the object within an NWBFile by tracing its parents - if neurodata_object.parent is None: + if neurodata_object.parent is None: # type: ignore return "/" # Best solution: object is or has a HDF5 Dataset if isinstance(neurodata_object, (h5py.Dataset, zarr.Array)): - return neurodata_object.name + return neurodata_object.name # type: ignore else: - for field_name, field in neurodata_object.fields.items(): + for field_name, field in neurodata_object.fields.items(): # type: ignore if isinstance(field, h5py.Dataset): - return field.parent.name + return field.parent.name # type: ignore elif isinstance(field, zarr.Array): return field.name.removesuffix(f"/{field_name}") + + return None diff --git a/src/nwbinspector/_types.py b/src/nwbinspector/_types.py index a329a979d..d1aebd418 100644 --- a/src/nwbinspector/_types.py +++ b/src/nwbinspector/_types.py @@ -62,11 +62,11 @@ class InspectorMessage: message: str importance: Importance = Importance.BEST_PRACTICE_SUGGESTION severity: Severity = Severity.LOW - check_function_name: str = None - object_type: str = None - object_name: str = None + check_function_name: Optional[str] = None + object_type: Optional[str] = None + object_name: Optional[str] = None location: Optional[str] = None - file_path: str = None + file_path: Optional[str] = None def __repr__(self): """Representation for InspectorMessage objects according to black format.""" diff --git a/src/nwbinspector/checks/_behavior.py b/src/nwbinspector/checks/_behavior.py index fe5b9cfb1..bca5af861 100644 --- a/src/nwbinspector/checks/_behavior.py +++ b/src/nwbinspector/checks/_behavior.py @@ -1,5 +1,7 @@ """Checks for types belonging to the pynwb.behavior module.""" +from typing import Iterable, Optional + import numpy as np from pynwb.behavior import CompassDirection, SpatialSeries @@ -7,7 +9,7 @@ @register_check(importance=Importance.CRITICAL, neurodata_type=SpatialSeries) -def check_spatial_series_dims(spatial_series: SpatialSeries): +def check_spatial_series_dims(spatial_series: SpatialSeries) -> Optional[InspectorMessage]: """ Check if a SpatialSeries has the correct dimensions. @@ -18,9 +20,11 @@ def check_spatial_series_dims(spatial_series: SpatialSeries): message="SpatialSeries should have 1 column (x), 2 columns (x, y), or 3 columns (x, y, z)." ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=CompassDirection) -def check_compass_direction_unit(compass_direction: CompassDirection): +def check_compass_direction_unit(compass_direction: CompassDirection) -> Optional[Iterable[InspectorMessage]]: """ Check that SpatialSeries objects within CompassDirection objects have units 'degrees' or 'radians'. @@ -33,9 +37,13 @@ def check_compass_direction_unit(compass_direction: CompassDirection): f"unit of 'degrees' or 'radians', but '{spatial_series.name}' has units '{spatial_series.unit}'." ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=SpatialSeries) -def check_spatial_series_radians_magnitude(spatial_series: SpatialSeries, nelems: int = 200): +def check_spatial_series_radians_magnitude( + spatial_series: SpatialSeries, nelems: int = 200 +) -> Optional[InspectorMessage]: """ Check that SpatialSeries with units radians have data values between -2*pi and 2*pi. @@ -48,9 +56,13 @@ def check_spatial_series_radians_magnitude(spatial_series: SpatialSeries, nelems message="SpatialSeries with units of radians must have values between -2pi and 2pi." ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=SpatialSeries) -def check_spatial_series_degrees_magnitude(spatial_series: SpatialSeries, nelems: int = 200): +def check_spatial_series_degrees_magnitude( + spatial_series: SpatialSeries, nelems: int = 200 +) -> Optional[InspectorMessage]: """ Check that SpatialSeries with units of degrees have data values between -360 and 360. @@ -62,3 +74,5 @@ def check_spatial_series_degrees_magnitude(spatial_series: SpatialSeries, nelems return InspectorMessage( message="SpatialSeries with units of degrees must have values between -360 and 360." ) + + return None diff --git a/src/nwbinspector/checks/_ecephys.py b/src/nwbinspector/checks/_ecephys.py index 5487c04cc..52cd46a72 100644 --- a/src/nwbinspector/checks/_ecephys.py +++ b/src/nwbinspector/checks/_ecephys.py @@ -1,5 +1,7 @@ """Check functions specific to extracellular electrophysiology neurodata types.""" +from typing import Optional + import numpy as np from pynwb.ecephys import ElectricalSeries from pynwb.misc import Units @@ -9,10 +11,10 @@ @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=Units) -def check_negative_spike_times(units_table: Units): +def check_negative_spike_times(units_table: Units) -> Optional[InspectorMessage]: """Check if the Units table contains negative spike times.""" if "spike_times" not in units_table: - return + return None if np.any(np.asarray(units_table["spike_times"].target.data[:]) < 0): return InspectorMessage( message=( @@ -21,9 +23,11 @@ def check_negative_spike_times(units_table: Units): ) ) + return None + @register_check(importance=Importance.CRITICAL, neurodata_type=ElectricalSeries) -def check_electrical_series_dims(electrical_series: ElectricalSeries): +def check_electrical_series_dims(electrical_series: ElectricalSeries) -> Optional[InspectorMessage]: """ Use the length of the linked electrode region to check the data orientation. @@ -47,9 +51,13 @@ def check_electrical_series_dims(electrical_series: ElectricalSeries): ) ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=ElectricalSeries) -def check_electrical_series_reference_electrodes_table(electrical_series: ElectricalSeries): +def check_electrical_series_reference_electrodes_table( + electrical_series: ElectricalSeries, +) -> Optional[InspectorMessage]: """ Check that the 'electrodes' of an ElectricalSeries references the ElectrodesTable. @@ -58,12 +66,14 @@ def check_electrical_series_reference_electrodes_table(electrical_series: Electr if electrical_series.electrodes.table.name != "electrodes": return InspectorMessage(message="electrodes does not reference an electrodes table.") + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=Units) -def check_spike_times_not_in_unobserved_interval(units_table: Units, nunits: int = 4): +def check_spike_times_not_in_unobserved_interval(units_table: Units, nunits: int = 4) -> Optional[InspectorMessage]: """Check if a Units table has spike times that occur outside of observed intervals.""" if not units_table.obs_intervals: - return + return None for unit_spike_times, unit_obs_intervals in zip( units_table["spike_times"][:nunits], units_table["obs_intervals"][:nunits] ): @@ -82,3 +92,5 @@ def check_spike_times_not_in_unobserved_interval(units_table: Units, nunits: int "observed intervals." ) ) + + return None diff --git a/src/nwbinspector/checks/_general.py b/src/nwbinspector/checks/_general.py index dd5c3afb6..f9b0b9563 100644 --- a/src/nwbinspector/checks/_general.py +++ b/src/nwbinspector/checks/_general.py @@ -1,27 +1,33 @@ """Check functions that examine any general neurodata_type with the available attributes.""" +from typing import Optional + from .._registration import Importance, InspectorMessage, register_check COMMON_DESCRIPTION_PLACEHOLDERS = ["no description", "no desc", "none", "placeholder"] @register_check(importance=Importance.CRITICAL, neurodata_type=None) -def check_name_slashes(obj): +def check_name_slashes(neurodata_object: object) -> Optional[InspectorMessage]: """Check if there has been added for the session.""" - if hasattr(obj, "name") and any((x in obj.name for x in ["/", "\\"])): + if hasattr(neurodata_object, "name") and any((x in neurodata_object.name for x in ["/", "\\"])): return InspectorMessage(message="Object name contains slashes.") + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=None) -def check_description(obj): +def check_description(neurodata_object: object) -> Optional[InspectorMessage]: """ Check if the description is a not missing or a placeholder. Best Practice: :ref:`best_practice_placeholders` """ - if not hasattr(obj, "description"): - return - if obj.description is None or obj.description.strip(" ") == "": + if not hasattr(neurodata_object, "description"): + return None + if neurodata_object.description is None or neurodata_object.description.strip(" ") == "": return InspectorMessage(message="Description is missing.") - if obj.description.lower().strip(".") in COMMON_DESCRIPTION_PLACEHOLDERS: - return InspectorMessage(message=f"Description ('{obj.description}') is a placeholder.") + if neurodata_object.description.lower().strip(".") in COMMON_DESCRIPTION_PLACEHOLDERS: + return InspectorMessage(message=f"Description ('{neurodata_object.description}') is a placeholder.") + + return None diff --git a/src/nwbinspector/checks/_icephys.py b/src/nwbinspector/checks/_icephys.py index 02cf1d319..5fd02e8fd 100644 --- a/src/nwbinspector/checks/_icephys.py +++ b/src/nwbinspector/checks/_icephys.py @@ -1,12 +1,16 @@ """Checks specific to intracellular electrophysiology neurodata types.""" +from typing import Optional + from pynwb.icephys import IntracellularElectrode from .._registration import Importance, InspectorMessage, register_check @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=IntracellularElectrode) -def check_intracellular_electrode_cell_id_exists(intracellular_electrode: IntracellularElectrode): +def check_intracellular_electrode_cell_id_exists( + intracellular_electrode: IntracellularElectrode, +) -> Optional[InspectorMessage]: """ Check if the IntracellularElectrode contains a cell_id. @@ -14,3 +18,5 @@ def check_intracellular_electrode_cell_id_exists(intracellular_electrode: Intrac """ if hasattr(intracellular_electrode, "cell_id") and intracellular_electrode.cell_id is None: return InspectorMessage(message="Please include a unique cell_id associated with this IntracellularElectrode.") + + return None diff --git a/src/nwbinspector/checks/_image_series.py b/src/nwbinspector/checks/_image_series.py index ad5a76757..f59e9f5a1 100644 --- a/src/nwbinspector/checks/_image_series.py +++ b/src/nwbinspector/checks/_image_series.py @@ -2,6 +2,7 @@ import ntpath from pathlib import Path +from typing import Iterable, Optional from pynwb.image import ImageSeries from pynwb.ophys import TwoPhotonSeries @@ -11,15 +12,15 @@ @register_check(importance=Importance.CRITICAL, neurodata_type=ImageSeries) -def check_image_series_external_file_valid(image_series: ImageSeries): +def check_image_series_external_file_valid(image_series: ImageSeries) -> Optional[Iterable[InspectorMessage]]: """ Check if the external_file specified by an ImageSeries actually exists. Best Practice: :ref:`best_practice_use_external_mode` """ if image_series.external_file is None: - return - nwbfile_path = Path(get_nwbfile_path_from_internal_object(obj=image_series)) + return None + nwbfile_path = Path(get_nwbfile_path_from_internal_object(neurodata_object=image_series)) for file_path in image_series.external_file: file_path = file_path.decode() if isinstance(file_path, bytes) else file_path if not Path(file_path).is_absolute() and not (nwbfile_path.parent / file_path).exists(): @@ -30,16 +31,18 @@ def check_image_series_external_file_valid(image_series: ImageSeries): ) ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=ImageSeries) -def check_image_series_external_file_relative(image_series: ImageSeries): +def check_image_series_external_file_relative(image_series: ImageSeries) -> Optional[Iterable[InspectorMessage]]: """ Check if the external_file specified by an ImageSeries, if it exists, is relative. Best Practice: :ref:`best_practice_use_external_mode` """ if image_series.external_file is None: - return + return None for file_path in image_series.external_file: file_path = file_path.decode() if isinstance(file_path, bytes) else file_path if ntpath.isabs(file_path): # ntpath required for cross-platform detection @@ -50,9 +53,11 @@ def check_image_series_external_file_relative(image_series: ImageSeries): ) ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=ImageSeries) -def check_image_series_data_size(image_series: ImageSeries, gb_lower_bound: float = 20.0): +def check_image_series_data_size(image_series: ImageSeries, gb_lower_bound: float = 20.0) -> Optional[InspectorMessage]: """ Check if an ImageSeries stored is larger than gb_lower_bound and suggests external file. @@ -61,7 +66,7 @@ def check_image_series_data_size(image_series: ImageSeries, gb_lower_bound: floa # False positive case; TwoPhotonSeries are a subclass of ImageSeries, but it is very common and perfectly fine # to write lots of data using one without an external file if isinstance(image_series, TwoPhotonSeries): - return + return None data = image_series.data @@ -72,3 +77,5 @@ def check_image_series_data_size(image_series: ImageSeries, gb_lower_bound: floa if data_size_gb > gb_lower_bound: return InspectorMessage(message="ImageSeries is very large. Consider using external mode for better storage.") + + return None diff --git a/src/nwbinspector/checks/_images.py b/src/nwbinspector/checks/_images.py index a68f375e7..4ae9b419e 100644 --- a/src/nwbinspector/checks/_images.py +++ b/src/nwbinspector/checks/_images.py @@ -1,5 +1,7 @@ """Checks specific to the Images neurodata type.""" +from typing import Optional + from pynwb.base import Images from pynwb.image import IndexSeries @@ -7,30 +9,36 @@ @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=Images) -def check_order_of_images_unique(images: Images): +def check_order_of_images_unique(images: Images) -> Optional[InspectorMessage]: """Check that all the values in the order_of_images field of an Images object are unique.""" if images.order_of_images is None: - return + return None if not len(set(images.order_of_images)) == len(images.order_of_images): return InspectorMessage(message="order_of_images should have unique values.") + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=Images) -def check_order_of_images_len(images: Images): +def check_order_of_images_len(images: Images) -> Optional[InspectorMessage]: """Check that all the values in the order_of_images field of an Images object are unique.""" if images.order_of_images is None: - return + return None if not len(images.order_of_images) == len(images.images): return InspectorMessage( message=f"Length of order_of_images ({len(images.order_of_images)}) does not match the number of " f"images ({len(images.images)})." ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=IndexSeries) -def check_index_series_points_to_image(index_series: IndexSeries): +def check_index_series_points_to_image(index_series: IndexSeries) -> Optional[InspectorMessage]: if index_series.indexed_timeseries is not None: return InspectorMessage( message="Pointing an IndexSeries to a TimeSeries will be deprecated. Please point to an Images " "container instead." ) + + return None diff --git a/src/nwbinspector/checks/_nwb_containers.py b/src/nwbinspector/checks/_nwb_containers.py index 2d45c39fa..d48d1706f 100644 --- a/src/nwbinspector/checks/_nwb_containers.py +++ b/src/nwbinspector/checks/_nwb_containers.py @@ -1,6 +1,7 @@ """Check functions that can apply to any object inside an NWBContainer.""" import os +from typing import Iterable, Optional import h5py import zarr @@ -10,7 +11,9 @@ @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=NWBContainer) -def check_large_dataset_compression(nwb_container: NWBContainer, gb_lower_bound: float = 20.0): +def check_large_dataset_compression( + nwb_container: NWBContainer, gb_lower_bound: float = 20.0 +) -> Optional[InspectorMessage]: """ If the data in the Container object is a 'large' h5py.Dataset, check if it has compression enabled. @@ -35,6 +38,8 @@ def check_large_dataset_compression(nwb_container: NWBContainer, gb_lower_bound: message=f"{os.path.split(field.name)[1]} is a large uncompressed dataset! Please enable compression.", ) + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=NWBContainer) def check_small_dataset_compression( @@ -42,7 +47,7 @@ def check_small_dataset_compression( gb_severity_threshold: float = 10.0, mb_lower_bound: float = 50.0, gb_upper_bound: float = 20.0, # 20 GB upper bound to prevent double-raise -): +) -> Optional[InspectorMessage]: """ If the data in the Container object is a h5py.Dataset, check if it has compression enabled. @@ -77,9 +82,11 @@ def check_small_dataset_compression( ), ) + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=NWBContainer) -def check_empty_string_for_optional_attribute(nwb_container: NWBContainer): +def check_empty_string_for_optional_attribute(nwb_container: NWBContainer) -> Optional[Iterable[InspectorMessage]]: """ Check if any NWBContainer has optional fields that are written as an empty string. @@ -101,3 +108,5 @@ def check_empty_string_for_optional_attribute(nwb_container: NWBContainer): message=f'The attribute "{field}" is optional and you have supplied an empty string. Improve my omitting ' "this attribute (in MatNWB or PyNWB) or entering as None (in PyNWB)" ) + + return None diff --git a/src/nwbinspector/checks/_nwbfile_metadata.py b/src/nwbinspector/checks/_nwbfile_metadata.py index 301fd358a..048395032 100644 --- a/src/nwbinspector/checks/_nwbfile_metadata.py +++ b/src/nwbinspector/checks/_nwbfile_metadata.py @@ -2,6 +2,7 @@ import re from datetime import datetime +from typing import Iterable, Optional from isodate import Duration, parse_duration from pynwb import NWBFile, ProcessingModule @@ -20,7 +21,7 @@ @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=NWBFile) -def check_session_start_time_old_date(nwbfile: NWBFile): +def check_session_start_time_old_date(nwbfile: NWBFile) -> Optional[InspectorMessage]: """ Check if the session_start_time was set to an appropriate value. @@ -38,9 +39,11 @@ def check_session_start_time_old_date(nwbfile: NWBFile): message=(f"The session_start_time ({session_start_time}) may not be set to the true date of the recording.") ) + return None + @register_check(importance=Importance.CRITICAL, neurodata_type=NWBFile) -def check_session_start_time_future_date(nwbfile: NWBFile): +def check_session_start_time_future_date(nwbfile: NWBFile) -> Optional[InspectorMessage]: """ Check if the session_start_time was set to an appropriate value. @@ -57,9 +60,11 @@ def check_session_start_time_future_date(nwbfile: NWBFile): message=f"The session_start_time ({session_start_time}) is set to a future date and time." ) + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=NWBFile) -def check_experimenter_exists(nwbfile: NWBFile): +def check_experimenter_exists(nwbfile: NWBFile) -> Optional[InspectorMessage]: """ Check if an experimenter has been added for the session. @@ -68,16 +73,18 @@ def check_experimenter_exists(nwbfile: NWBFile): if not nwbfile.experimenter: return InspectorMessage(message="Experimenter is missing.") + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=NWBFile) -def check_experimenter_form(nwbfile: NWBFile): +def check_experimenter_form(nwbfile: NWBFile) -> Optional[Iterable[InspectorMessage]]: """ Check the text form of each experimenter to see if it matches the DANDI regex pattern. Best Practice: :ref:`best_practice_experimenter` """ if nwbfile.experimenter is None: - return + return None if is_module_installed(module_name="dandi"): from dandischema.models import ( NAME_PATTERN, # for most up to date version of the regex @@ -95,42 +102,52 @@ def check_experimenter_form(nwbfile: NWBFile): ) ) + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=NWBFile) -def check_experiment_description(nwbfile: NWBFile): +def check_experiment_description(nwbfile: NWBFile) -> Optional[InspectorMessage]: """Check if a description has been added for the session.""" if not nwbfile.experiment_description: return InspectorMessage(message="Experiment description is missing.") + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=NWBFile) -def check_institution(nwbfile: NWBFile): +def check_institution(nwbfile: NWBFile) -> Optional[InspectorMessage]: """Check if a description has been added for the session.""" if not nwbfile.institution: return InspectorMessage(message="Metadata /general/institution is missing.") + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=NWBFile) -def check_keywords(nwbfile: NWBFile): +def check_keywords(nwbfile: NWBFile) -> Optional[InspectorMessage]: """Check if keywords have been added for the session.""" if not nwbfile.keywords: return InspectorMessage(message="Metadata /general/keywords is missing.") + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=NWBFile) -def check_subject_exists(nwbfile: NWBFile): +def check_subject_exists(nwbfile: NWBFile) -> Optional[InspectorMessage]: """Check if subject exists.""" if nwbfile.subject is None: return InspectorMessage(message="Subject is missing.") + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=NWBFile) -def check_doi_publications(nwbfile: NWBFile): +def check_doi_publications(nwbfile: NWBFile) -> Optional[Iterable[InspectorMessage]]: """Check if related_publications has been properly added as 'doi: ###' or an external 'doi' link.""" valid_starts = ["doi:", "http://dx.doi.org/", "https://doi.org/"] if not nwbfile.related_publications: - return + return None for publication in nwbfile.related_publications: publication = publication.decode() if isinstance(publication, bytes) else publication if not any((publication.startswith(valid_start) for valid_start in valid_starts)): @@ -141,18 +158,20 @@ def check_doi_publications(nwbfile: NWBFile): ) ) + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=Subject) -def check_subject_age(subject: Subject): +def check_subject_age(subject: Subject) -> Optional[InspectorMessage]: """Check if the Subject age is in ISO 8601 or our extension of it for ranges.""" if subject.age is None and subject.date_of_birth is None: return InspectorMessage( message="Subject is missing age and date_of_birth. Please specify at least one of these fields." ) elif subject.age is None and subject.date_of_birth is not None: - return + return None if re.fullmatch(pattern=duration_regex, string=subject.age): - return + return None if "/" in subject.age: subject_lower_age_bound, subject_upper_age_bound = subject.age.split("/") @@ -160,7 +179,7 @@ def check_subject_age(subject: Subject): if re.fullmatch(pattern=duration_regex, string=subject_lower_age_bound) and ( re.fullmatch(pattern=duration_regex, string=subject_upper_age_bound) or subject_upper_age_bound == "" ): - return + return None return InspectorMessage( message=( @@ -173,7 +192,7 @@ def check_subject_age(subject: Subject): @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=Subject) -def check_subject_proper_age_range(subject: Subject): +def check_subject_proper_age_range(subject: Subject) -> Optional[InspectorMessage]: """ Check if the Subject age, if specified as duration range (e.g., 'P1D/P3D'), has properly increasing bounds. @@ -201,30 +220,38 @@ def check_subject_proper_age_range(subject: Subject): ) ) + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=Subject) -def check_subject_id_exists(subject: Subject): +def check_subject_id_exists(subject: Subject) -> Optional[InspectorMessage]: """Check if subject_id is defined.""" if subject.subject_id is None: return InspectorMessage(message="subject_id is missing.") + return None + -def _check_subject_sex_defaults(sex: str): +def _check_subject_sex_defaults(sex: str) -> Optional[InspectorMessage]: """Check if the subject sex has been specified properly for the C. elegans species.""" if sex not in ("M", "F", "O", "U"): return InspectorMessage( message="Subject.sex should be one of: 'M' (male), 'F' (female), 'O' (other), or 'U' (unknown)." ) + return None -def _check_subject_sex_c_elegans(sex: str): + +def _check_subject_sex_c_elegans(sex: str) -> Optional[InspectorMessage]: """Check if the subject sex has been specified properly for the C. elegans species.""" if sex not in ("XO", "XX"): return InspectorMessage(message="For C. elegans, Subject.sex should be 'XO' (male) or 'XX' (hermaphrodite).") + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=Subject) -def check_subject_sex(subject: Subject): +def check_subject_sex(subject: Subject) -> Optional[InspectorMessage]: """ Check if the subject sex has been specified and ensure that it has has the correct form depending on the species. @@ -237,9 +264,11 @@ def check_subject_sex(subject: Subject): else: return _check_subject_sex_defaults(sex=subject.sex) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=Subject) -def check_subject_species_exists(subject: Subject): +def check_subject_species_exists(subject: Subject) -> Optional[InspectorMessage]: """ Check if the subject species has been specified. @@ -248,9 +277,11 @@ def check_subject_species_exists(subject: Subject): if not subject.species: return InspectorMessage(message="Subject species is missing.") + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=Subject) -def check_subject_species_form(subject: Subject): +def check_subject_species_form(subject: Subject) -> Optional[InspectorMessage]: """ Check if the subject species follows latin binomial form or is a link to an NCBI taxonomy in the form of a Term IRI. @@ -266,9 +297,11 @@ def check_subject_species_form(subject: Subject): ), ) + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=ProcessingModule) -def check_processing_module_name(processing_module: ProcessingModule): +def check_processing_module_name(processing_module: ProcessingModule) -> Optional[InspectorMessage]: """Check if the name of a processing module is of a valid modality.""" if processing_module.name not in PROCESSING_MODULE_CONFIG: return InspectorMessage( @@ -277,3 +310,5 @@ def check_processing_module_name(processing_module: ProcessingModule): f"schema module names: {', '.join(PROCESSING_MODULE_CONFIG)}" ) ) + + return None diff --git a/src/nwbinspector/checks/_ogen.py b/src/nwbinspector/checks/_ogen.py index 5b0a2af5a..27e8a08cd 100644 --- a/src/nwbinspector/checks/_ogen.py +++ b/src/nwbinspector/checks/_ogen.py @@ -1,14 +1,19 @@ +from typing import Optional + from pynwb.ogen import OptogeneticSeries, OptogeneticStimulusSite from .._registration import Importance, InspectorMessage, register_check @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=OptogeneticStimulusSite) -def check_optogenetic_stimulus_site_has_optogenetic_series(ogen_site: OptogeneticStimulusSite): +def check_optogenetic_stimulus_site_has_optogenetic_series( + ogen_site: OptogeneticStimulusSite, +) -> Optional[InspectorMessage]: """Check if an optogenetic stimulus site has an optogenetic series linked to it.""" nwbfile = ogen_site.get_ancestor("NWBFile") for obj in nwbfile.objects.values(): if isinstance(obj, OptogeneticSeries): if obj.site == ogen_site: - return + return None + return InspectorMessage(message="OptogeneticStimulusSite is not referenced by any OptogeneticStimulusSite.") diff --git a/src/nwbinspector/checks/_ophys.py b/src/nwbinspector/checks/_ophys.py index c28b8119b..38e9f9711 100644 --- a/src/nwbinspector/checks/_ophys.py +++ b/src/nwbinspector/checks/_ophys.py @@ -1,5 +1,7 @@ """Check functions specific to optical electrophysiology neurodata types.""" +from typing import Iterable, Optional + from pynwb.ophys import ( ImagingPlane, OpticalChannel, @@ -14,7 +16,7 @@ @register_check(importance=Importance.CRITICAL, neurodata_type=RoiResponseSeries) -def check_roi_response_series_dims(roi_response_series: RoiResponseSeries): +def check_roi_response_series_dims(roi_response_series: RoiResponseSeries) -> Optional[InspectorMessage]: """ Check the dimensions of an ROI series to ensure the time axis is the correct dimension. @@ -37,11 +39,13 @@ def check_roi_response_series_dims(roi_response_series: RoiResponseSeries): message="The second dimension of data does not match the length of rois. Your data may be transposed." ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=RoiResponseSeries) def check_roi_response_series_link_to_plane_segmentation( roi_response_series: RoiResponseSeries, -): +) -> Optional[InspectorMessage]: """ Check that each ROI response series links to a plane segmentation. @@ -50,9 +54,11 @@ def check_roi_response_series_link_to_plane_segmentation( if not isinstance(roi_response_series.rois.table, PlaneSegmentation): return InspectorMessage(message="rois field does not point to a PlaneSegmentation table.") + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=OpticalChannel) -def check_emission_lambda_in_nm(optical_channel: OpticalChannel): +def check_emission_lambda_in_nm(optical_channel: OpticalChannel) -> Optional[InspectorMessage]: """ Check that emission lambda is in feasible range for unit nanometers. @@ -61,9 +67,11 @@ def check_emission_lambda_in_nm(optical_channel: OpticalChannel): if optical_channel.emission_lambda < MIN_LAMBDA: return InspectorMessage(f"emission lambda of {optical_channel.emission_lambda} should be in units of nm.") + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=ImagingPlane) -def check_excitation_lambda_in_nm(imaging_plane: ImagingPlane): +def check_excitation_lambda_in_nm(imaging_plane: ImagingPlane) -> Optional[InspectorMessage]: """ Check that emission lambda is in feasible range for unit nanometers. @@ -72,9 +80,13 @@ def check_excitation_lambda_in_nm(imaging_plane: ImagingPlane): if imaging_plane.excitation_lambda < MIN_LAMBDA: return InspectorMessage(f"excitation lambda of {imaging_plane.excitation_lambda} should be in units of nm.") + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=PlaneSegmentation) -def check_plane_segmentation_image_mask_shape_against_ref_images(plane_segmentation: PlaneSegmentation): +def check_plane_segmentation_image_mask_shape_against_ref_images( + plane_segmentation: PlaneSegmentation, +) -> Optional[Iterable[InspectorMessage]]: """Check that image masks and reference images have the same shape.""" if plane_segmentation.reference_images and "image_mask" in plane_segmentation.colnames: mask_shape = plane_segmentation["image_mask"].shape[1:] @@ -84,3 +96,5 @@ def check_plane_segmentation_image_mask_shape_against_ref_images(plane_segmentat f"image_mask of shape {mask_shape} does not match reference image {ref_image.name} with shape" f" {ref_image.data.shape[1:]}." ) + + return None diff --git a/src/nwbinspector/checks/_tables.py b/src/nwbinspector/checks/_tables.py index daf988768..03456febf 100644 --- a/src/nwbinspector/checks/_tables.py +++ b/src/nwbinspector/checks/_tables.py @@ -1,7 +1,7 @@ """Check functions that can apply to any descendant of DynamicTable.""" from numbers import Real -from typing import List, Optional +from typing import Iterable, Optional import numpy as np from hdmf.common import DynamicTable, DynamicTableRegion, VectorIndex @@ -21,7 +21,9 @@ @register_check(importance=Importance.CRITICAL, neurodata_type=DynamicTableRegion) -def check_dynamic_table_region_data_validity(dynamic_table_region: DynamicTableRegion, nelems: Optional[int] = NELEMS): +def check_dynamic_table_region_data_validity( + dynamic_table_region: DynamicTableRegion, nelems: Optional[int] = NELEMS +) -> Optional[InspectorMessage]: """Check if a DynamicTableRegion is valid.""" if np.any(np.asarray(dynamic_table_region.data[:nelems]) > len(dynamic_table_region.table)): return InspectorMessage( @@ -35,16 +37,22 @@ def check_dynamic_table_region_data_validity(dynamic_table_region: DynamicTableR message=f"Some elements of {dynamic_table_region.name} are out of range because they are less than 0." ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=DynamicTable) -def check_empty_table(table: DynamicTable): +def check_empty_table(table: DynamicTable) -> Optional[InspectorMessage]: """Check if a DynamicTable is empty.""" if len(table.id) == 0: return InspectorMessage(message="This table has no data added to it.") + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=TimeIntervals) -def check_time_interval_time_columns(time_intervals: TimeIntervals, nelems: Optional[int] = NELEMS): +def check_time_interval_time_columns( + time_intervals: TimeIntervals, nelems: Optional[int] = NELEMS +) -> Optional[InspectorMessage]: """ Check that time columns are in ascending order. @@ -69,9 +77,13 @@ def check_time_interval_time_columns(time_intervals: TimeIntervals, nelems: Opti ) ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=TimeIntervals) -def check_time_intervals_stop_after_start(time_intervals: TimeIntervals, nelems: Optional[int] = NELEMS): +def check_time_intervals_stop_after_start( + time_intervals: TimeIntervals, nelems: Optional[int] = NELEMS +) -> Optional[InspectorMessage]: """ Check that all stop times on a TimeInterval object occur after their corresponding start times. @@ -97,9 +109,13 @@ def check_time_intervals_stop_after_start(time_intervals: TimeIntervals, nelems: ) ) + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=DynamicTable) -def check_column_binary_capability(table: DynamicTable, nelems: Optional[int] = NELEMS): +def check_column_binary_capability( + table: DynamicTable, nelems: Optional[int] = NELEMS +) -> Optional[Iterable[InspectorMessage]]: """ Check each column of a table to see if the data could be set as a boolean dtype. @@ -138,7 +154,7 @@ def check_column_binary_capability(table: DynamicTable, nelems: Optional[int] = ["t", "f"], ["hit", "miss"], ] - if any([set(parsed_unique_values) == set(pair) for pair in pairs_to_check]): + if any([set(parsed_unique_values) == set(pair) for pair in pairs_to_check]): # type: ignore saved_bytes = (unique_values.dtype.itemsize - 1) * np.product( get_data_shape(data=column.data, strict_no_data_load=True) ) @@ -157,32 +173,40 @@ def check_column_binary_capability(table: DynamicTable, nelems: Optional[int] = f"save {format_byte_size(byte_size=saved_bytes)}." ) ) + return None @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=DynamicTable) def check_single_row( table: DynamicTable, - exclude_types: Optional[list] = (Units,), - exclude_names: Optional[List[str]] = ("electrodes",), -): + exclude_types: Optional[tuple] = None, + exclude_names: Optional[tuple[str]] = None, +) -> Optional[InspectorMessage]: """ Check if DynamicTable has only a single row; may be better represented by another data type. Skips the Units table since it is OK to have only a single spiking unit. Skips the Electrode table since it is OK to have only a single electrode. """ + exclude_types = exclude_types or (Units,) + exclude_names = exclude_names or ("electrodes",) + if any((isinstance(table, exclude_type) for exclude_type in exclude_types)): - return + return None if any((table.name == exclude_name for exclude_name in exclude_names)): - return + return None if len(table.id) == 1: return InspectorMessage( message="This table has only a single row; it may be better represented by another data type." ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=DynamicTable) -def check_table_values_for_dict(table: DynamicTable, nelems: Optional[int] = NELEMS): +def check_table_values_for_dict( + table: DynamicTable, nelems: Optional[int] = NELEMS +) -> Optional[Iterable[InspectorMessage]]: """Check if any values in a row or column of a table contain a string casting of a Python dictionary.""" for column in table.columns: if not hasattr(column, "data") or isinstance(column, VectorIndex) or not isinstance(column.data[0], str): @@ -197,9 +221,11 @@ def check_table_values_for_dict(table: DynamicTable, nelems: Optional[int] = NEL message += " This string is also JSON loadable, so call `json.loads(...)` on the string to unpack." yield InspectorMessage(message=message) + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=DynamicTable) -def check_col_not_nan(table: DynamicTable, nelems: Optional[int] = NELEMS): +def check_col_not_nan(table: DynamicTable, nelems: Optional[int] = NELEMS) -> Optional[Iterable[InspectorMessage]]: """Check if all the values in a single column of a table are NaN.""" for column in table.columns: if ( @@ -217,9 +243,11 @@ def check_col_not_nan(table: DynamicTable, nelems: Optional[int] = NELEMS): if all(np.isnan(column[slice(0, None, slice_by)]).flatten()): yield InspectorMessage(message=message) + return None + @register_check(importance=Importance.CRITICAL, neurodata_type=DynamicTable) -def check_ids_unique(table: DynamicTable, nelems: Optional[int] = NELEMS): +def check_ids_unique(table: DynamicTable, nelems: Optional[int] = NELEMS) -> Optional[InspectorMessage]: """ Ensure that the values of the id attribute of a DynamicTable are unique. @@ -239,9 +267,11 @@ def check_ids_unique(table: DynamicTable, nelems: Optional[int] = NELEMS): if len(set(data)) != len(data): return InspectorMessage(message="This table has ids that are not unique.") + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=DynamicTable) -def check_table_time_columns_are_not_negative(table: DynamicTable): +def check_table_time_columns_are_not_negative(table: DynamicTable) -> Optional[Iterable[InspectorMessage]]: """ Check that time columns are not negative. @@ -259,3 +289,5 @@ def check_table_time_columns_are_not_negative(table: DynamicTable): message=f"Timestamps in column {column_name} should not be negative." " It is recommended to align the `session_start_time` or `timestamps_reference_time` to be the earliest time value that occurs in the data, and shift all other signals accordingly." ) + + return None diff --git a/src/nwbinspector/checks/_time_series.py b/src/nwbinspector/checks/_time_series.py index 4086a6ed5..8956171fd 100644 --- a/src/nwbinspector/checks/_time_series.py +++ b/src/nwbinspector/checks/_time_series.py @@ -1,5 +1,7 @@ """Check functions that can apply to any descendant of TimeSeries.""" +from typing import Optional + import numpy as np from pynwb import TimeSeries from pynwb.image import ImageSeries, IndexSeries @@ -11,7 +13,7 @@ @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=TimeSeries) def check_regular_timestamps( time_series: TimeSeries, time_tolerance_decimals: int = 9, gb_severity_threshold: float = 1.0 -): +) -> Optional[InspectorMessage]: """If the TimeSeries uses timestamps, check if they are regular (i.e., they have a constant rate).""" if ( time_series.timestamps is not None @@ -33,14 +35,19 @@ def check_regular_timestamps( ), ) + return None + @register_check(importance=Importance.CRITICAL, neurodata_type=TimeSeries) -def check_data_orientation(time_series: TimeSeries): +def check_data_orientation(time_series: TimeSeries) -> Optional[InspectorMessage]: """If the TimeSeries has data, check if the longest axis (almost always time) is also the zero-axis.""" if time_series.data is None: - return + return None data_shape = get_data_shape(time_series.data) + if data_shape is None: + return None + if any(np.array(data_shape[1:]) > data_shape[0]): return InspectorMessage( message=( @@ -49,19 +56,29 @@ def check_data_orientation(time_series: TimeSeries): ), ) + return None + @register_check(importance=Importance.CRITICAL, neurodata_type=TimeSeries) -def check_timestamps_match_first_dimension(time_series: TimeSeries): +def check_timestamps_match_first_dimension(time_series: TimeSeries) -> Optional[InspectorMessage]: """ If the TimeSeries has timestamps, check if their length is the same as the zero-axis of data. Best Practice: :ref:`best_practice_data_orientation` """ if time_series.data is None or time_series.timestamps is None: - return + return None + + data_shape = get_data_shape(time_series.data) + if data_shape is None: + return None + + timestamps_shape = get_data_shape(time_series.timestamps) + if timestamps_shape is None: + return None - if getattr(time_series, "external_file", None) is not None and get_data_shape(time_series.data)[0] == 0: - return + if getattr(time_series, "external_file", None) is not None and data_shape[0] == 0: + return None # A very specific edge case where this has been allowed, though much more preferable # to use a stack of Images rather than an ImageSeries @@ -72,10 +89,8 @@ def check_timestamps_match_first_dimension(time_series: TimeSeries): ): for neurodata_object in time_series.get_ancestor("NWBFile").objects.values(): if isinstance(neurodata_object, IndexSeries) and neurodata_object.indexed_timeseries == time_series: - return + return None - timestamps_shape = get_data_shape(time_series.timestamps) - data_shape = get_data_shape(time_series.data) if data_shape[0] != timestamps_shape[0]: return InspectorMessage( message=( @@ -84,23 +99,29 @@ def check_timestamps_match_first_dimension(time_series: TimeSeries): ) ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=TimeSeries) -def check_timestamps_ascending(time_series: TimeSeries, nelems=200): +def check_timestamps_ascending(time_series: TimeSeries, nelems: Optional[int] = 200) -> Optional[InspectorMessage]: """Check that the values in the timestamps array are strictly increasing.""" if time_series.timestamps is not None and not is_ascending_series(time_series.timestamps, nelems=nelems): return InspectorMessage(f"{time_series.name} timestamps are not ascending.") + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=TimeSeries) -def check_timestamps_without_nans(time_series: TimeSeries, nelems=200): +def check_timestamps_without_nans(time_series: TimeSeries, nelems: Optional[int] = 200) -> Optional[InspectorMessage]: """Check if there are NaN values in the timestamps array.""" if time_series.timestamps is not None and np.isnan(time_series.timestamps[:nelems]).any(): return InspectorMessage(message=f"{time_series.name} timestamps contain NaN values.") + return None + @register_check(importance=Importance.BEST_PRACTICE_SUGGESTION, neurodata_type=TimeSeries) -def check_timestamp_of_the_first_sample_is_not_negative(time_series: TimeSeries): +def check_timestamp_of_the_first_sample_is_not_negative(time_series: TimeSeries) -> Optional[InspectorMessage]: """ Check that the timestamp of the first sample is not negative. @@ -114,9 +135,11 @@ def check_timestamp_of_the_first_sample_is_not_negative(time_series: TimeSeries) " It is recommended to align the `session_start_time` or `timestamps_reference_time` to be the earliest time value that occurs in the data, and shift all other signals accordingly." ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=TimeSeries) -def check_missing_unit(time_series: TimeSeries): +def check_missing_unit(time_series: TimeSeries) -> Optional[InspectorMessage]: """ Check if the TimeSeries.unit field is empty. @@ -127,24 +150,34 @@ def check_missing_unit(time_series: TimeSeries): message="Missing text for attribute 'unit'. Please specify the scientific unit of the 'data'." ) + return None + @register_check(importance=Importance.BEST_PRACTICE_VIOLATION, neurodata_type=TimeSeries) -def check_resolution(time_series: TimeSeries): +def check_resolution(time_series: TimeSeries) -> Optional[InspectorMessage]: """Check the resolution value of a TimeSeries for proper format (-1.0 or NaN for unknown).""" if time_series.resolution is None or time_series.resolution == -1.0: - return + return None if time_series.resolution <= 0: return InspectorMessage( message=f"'resolution' should use -1.0 or NaN for unknown instead of {time_series.resolution}." ) + return None + @register_check(importance=Importance.CRITICAL, neurodata_type=TimeSeries) -def check_rate_is_not_zero(time_series: TimeSeries): +def check_rate_is_not_zero(time_series: TimeSeries) -> Optional[InspectorMessage]: if time_series.data is None: - return + return None + data_shape = get_data_shape(time_series.data) + if data_shape is None: + return None + if time_series.rate == 0.0 and data_shape[0] > 1: return InspectorMessage( f"{time_series.name} has a sampling rate value of 0.0Hz but the series has more than one frame." ) + + return None diff --git a/src/nwbinspector/inspector_tools/__init__.py b/src/nwbinspector/inspector_tools/__init__.py index 134b1f7b6..6cf0e655e 100644 --- a/src/nwbinspector/inspector_tools/__init__.py +++ b/src/nwbinspector/inspector_tools/__init__.py @@ -9,5 +9,12 @@ # Still keep imports functional with warning for soft deprecation cycle # TODO: remove after 9/15/2024 -from .._organization import organize_messages, _get_report_header -from .._formatting import format_message, MessageFormatter, FormatterOptions, print_to_console, save_report +from .._organization import organize_messages +from .._formatting import ( + format_messages, + MessageFormatter, + FormatterOptions, + print_to_console, + save_report, + _get_report_header, +) diff --git a/src/nwbinspector/testing/_testing.py b/src/nwbinspector/testing/_testing.py index 68b7629ef..ed05b8f42 100644 --- a/src/nwbinspector/testing/_testing.py +++ b/src/nwbinspector/testing/_testing.py @@ -7,6 +7,7 @@ from urllib import request from uuid import uuid4 +import zarr from hdmf.backends.hdf5 import HDF5IO from hdmf.backends.io import HDMFIO from packaging.version import Version @@ -15,6 +16,8 @@ from ..utils import get_package_version, is_module_installed, strtobool +TESTING_FILES_FOLDER_PATH = os.environ.get("TESTING_FILES_FOLDER_PATH", None) + def check_streaming_tests_enabled() -> tuple[bool, Optional[str]]: """ @@ -23,17 +26,17 @@ def check_streaming_tests_enabled() -> tuple[bool, Optional[str]]: Returns the boolean status of the check and, if False, provides a string reason for the failure for the user to utilize as they please (raise an error or warning with that message, print it, or ignore it). """ - failure_reason = "" + failure_reason: str = "" environment_skip_flag = os.environ.get("NWBI_SKIP_NETWORK_TESTS", "") environment_skip_flag_bool = ( strtobool(os.environ.get("NWBI_SKIP_NETWORK_TESTS", "")) if environment_skip_flag != "" else False ) if environment_skip_flag_bool: - failure_reason += "Environmental variable set to skip network tets." + failure_reason += "Environmental variable set to skip network tests." streaming_enabled, streaming_failure_reason = check_streaming_enabled() - if not streaming_enabled: + if not streaming_enabled and streaming_failure_reason is not None: failure_reason += streaming_failure_reason have_dandi = is_module_installed("dandi") @@ -44,26 +47,28 @@ def check_streaming_tests_enabled() -> tuple[bool, Optional[str]]: if not have_remfile: failure_reason += "The `remfile` package is not installed on the system." - failure_reason = None if failure_reason == "" else failure_reason - return streaming_enabled and not environment_skip_flag_bool and have_dandi, failure_reason + return_failure_reason: Optional[str] = None + if failure_reason != "": + return_failure_reason = failure_reason + + return streaming_enabled and not environment_skip_flag_bool and have_dandi, return_failure_reason -def generate_testing_files(): # pragma: no cover +def generate_testing_files() -> None: # pragma: no cover """Generate a local copy of the NWB files required for all tests.""" generate_image_series_testing_files() -def generate_image_series_testing_files(): # pragma: no cover +def generate_image_series_testing_files() -> None: # pragma: no cover """Generate a local copy of the NWB files required for the image series tests.""" assert get_package_version(name="pynwb") == Version("2.1.0"), "Generating the testing files requires PyNWB v2.1.0!" + assert TESTING_FILES_FOLDER_PATH is not None, "The `TESTING_FILES_FOLDER_PATH` environment variable is not set!" - testing_config = load_testing_config() - - local_path = Path(testing_config["LOCAL_PATH"]) - local_path.mkdir(exist_ok=True, parents=True) + testing_folder = Path(TESTING_FILES_FOLDER_PATH) + testing_folder.mkdir(exist_ok=True) - movie_1_file_path = local_path / "temp_movie_1.mov" - nested_folder = local_path / "nested_folder" + movie_1_file_path = testing_folder / "temp_movie_1.mov" + nested_folder = testing_folder / "nested_folder" nested_folder.mkdir(exist_ok=True) movie_2_file_path = nested_folder / "temp_movie_2.avi" for file_path in [movie_1_file_path, movie_2_file_path]: @@ -91,11 +96,11 @@ def generate_image_series_testing_files(): # pragma: no cover nwbfile.add_acquisition( ImageSeries(name="TestImageSeriesExternalPathIsNotRelative", rate=1.0, external_file=[absolute_file_path]) ) - with NWBHDF5IO(path=local_path / "image_series_testing_file.nwb", mode="w") as io: + with NWBHDF5IO(path=testing_folder / "image_series_testing_file.nwb", mode="w") as io: io.write(nwbfile) -def make_minimal_nwbfile(): +def make_minimal_nwbfile() -> NWBFile: """ Most basic NWBFile that can exist. @@ -113,18 +118,18 @@ def check_streaming_enabled() -> tuple[bool, Optional[str]]: """ try: request.urlopen("https://dandiarchive.s3.amazonaws.com/ros3test.nwb", timeout=1) - except request.URLError: + except request.URLError: # type: ignore return False, "Internet access to DANDI failed." return True, None -def check_hdf5_io_open(io: HDF5IO): - """Check if an h5py.File object is open by using the file object's .id attribute, which is invalid when the file is closed.""" +def check_hdf5_io_open(io: HDF5IO) -> bool: + """ + Check if an h5py.File object is open by using the file .id attribute, which is invalid when the file is closed. + """ return io._file.id.valid -def check_zarr_io_open(io: HDMFIO): +def check_zarr_io_open(io: HDMFIO) -> bool: """For Zarr, the private attribute `_ZarrIO__file` is set to a `zarr.group` on open.""" - import zarr # relative import since extra requirement - return isinstance(io._ZarrIO__file, zarr.Group) diff --git a/src/nwbinspector/tools/__init__.py b/src/nwbinspector/tools/__init__.py index 674f827a2..33612ae19 100644 --- a/src/nwbinspector/tools/__init__.py +++ b/src/nwbinspector/tools/__init__.py @@ -1,6 +1,6 @@ from ._dandi import get_s3_urls_and_dandi_paths from ._nwb import all_of_type, get_nwbfile_path_from_internal_object -from ._read_nwbfile import BACKEND_IO_CLASSES, read_nwbfile +from ._read_nwbfile import BACKEND_IO_CLASSES, read_nwbfile, read_nwbfile_and_io __all__ = [ "BACKEND_IO_CLASSES", @@ -8,4 +8,5 @@ "all_of_type", "get_nwbfile_path_from_internal_object", "read_nwbfile", + "read_nwbfile_and_io", ] diff --git a/src/nwbinspector/tools/_dandi.py b/src/nwbinspector/tools/_dandi.py index e1d338c5c..87fac36e2 100644 --- a/src/nwbinspector/tools/_dandi.py +++ b/src/nwbinspector/tools/_dandi.py @@ -46,7 +46,11 @@ def get_s3_urls_and_dandi_paths(dandiset_id: str, version_id: Optional[str] = No return s3_urls_to_dandi_paths -def _get_content_url_and_path(asset, follow_redirects: int = 1, strip_query: bool = True) -> dict[str, str]: +def _get_content_url_and_path( + asset: "dandi.dandiapi.BaseRemoteAsset", # type: ignore + follow_redirects: int = 1, + strip_query: bool = True, +) -> dict[str, str]: """ Private helper function for parallelization in 'get_s3_urls_and_dandi_paths'. diff --git a/src/nwbinspector/tools/_nwb.py b/src/nwbinspector/tools/_nwb.py index 24676220d..571007d71 100644 --- a/src/nwbinspector/tools/_nwb.py +++ b/src/nwbinspector/tools/_nwb.py @@ -1,17 +1,20 @@ """Helper functions related to NWB for internal use that rely on external dependencies (i.e., pynwb).""" +from typing import Iterable, Type + from pynwb import NWBFile -def all_of_type(nwbfile: NWBFile, neurodata_type): +def all_of_type(nwbfile: NWBFile, neurodata_type: Type) -> Iterable[object]: """Iterate over all objects inside an NWBFile object and return those that match the given neurodata_type.""" - for obj in nwbfile.objects.values(): - if isinstance(obj, neurodata_type): - yield obj + for neurodata_object in nwbfile.objects.values(): + if isinstance(neurodata_object, neurodata_type): + yield neurodata_object -def get_nwbfile_path_from_internal_object(obj): +def get_nwbfile_path_from_internal_object(neurodata_object: object) -> str: """Determine the file path on disk for a NWBFile given only an internal object of that file.""" - if isinstance(obj, NWBFile): - return obj.container_source - return obj.get_ancestor("NWBFile").container_source + if isinstance(neurodata_object, NWBFile): + return neurodata_object.container_source + + return neurodata_object.get_ancestor("NWBFile").container_source # type: ignore diff --git a/src/nwbinspector/tools/_read_nwbfile.py b/src/nwbinspector/tools/_read_nwbfile.py index 20fe30b08..de6d1eb51 100644 --- a/src/nwbinspector/tools/_read_nwbfile.py +++ b/src/nwbinspector/tools/_read_nwbfile.py @@ -15,35 +15,39 @@ ) -def _get_method(path: str): +def _get_method(path: str) -> Literal["local", "fsspec"]: if path.startswith(("https://", "http://", "s3://")): return "fsspec" elif Path(path).exists(): return "local" else: - raise ValueError( + message = ( f"Unable to automatically determine method. Path {path} does not appear to be a URL and is not a file on " f"the local filesystem." ) + raise ValueError(message) -def _init_fsspec(path): +def _init_fsspec(path: str) -> "fsspec.AbstractFileSystem": # type: ignore import fsspec if path.startswith(("https://", "http://")): return fsspec.filesystem("http") elif path.startswith("s3://"): return fsspec.filesystem("s3", anon=True) + else: + message = f"Unable to initialize fsspec on path '{path}'." + raise ValueError(message) -def _get_backend(path: str, method: Literal["local", "fsspec", "ros3"]): +def _get_backend(path: str, method: Literal["local", "fsspec", "ros3"]) -> Union[str, Literal["hdf5", "zarr"]]: if method == "ros3": return "hdf5" possible_backends = [] if method == "fsspec": - fs = _init_fsspec(path=path) - with fs.open(path=path, mode="rb") as file: + filesystem = _init_fsspec(path=path) + with filesystem.open(path=path, mode="rb") as file: for backend_name, backend_class in BACKEND_IO_CLASSES.items(): if backend_class.can_read(path=file): possible_backends.append(backend_name) @@ -66,8 +70,40 @@ def read_nwbfile( nwbfile_path: Union[str, Path], method: Optional[Literal["local", "fsspec", "ros3"]] = None, backend: Optional[Literal["hdf5", "zarr"]] = None, - return_io: bool = False, -) -> Union[NWBFile, tuple[NWBFile, HDMFIO]]: +) -> NWBFile: + """ + Read an NWB file using the specified (or auto-detected) method and specified (or auto-detected) backend. + + Parameters + ---------- + nwbfile_path : str or pathlib.Path + Path to the file on your system. + method : "local", "fsspec", "ros3", or None (default) + Where to read the file from; a local disk drive or steaming from an https:// or s3:// path. + The default auto-detects based on the form of the path. + When streaming, the default method is "fsspec". + Note that "ros3" is specific to HDF5 backend files. + backend : "hdf5", "zarr", or None (default) + Type of backend used to write the file. + The default auto-detects the type of the file. + + Returns + ------- + nwbfile : pynwb.NWBFile + The in-memory NWBFile object. + """ + nwbfile, _ = _read_nwbfile_helper(nwbfile_path=nwbfile_path, method=method, backend=backend) + + # Note: do not be concerned about IO object closing due to garbage collection here + # (the IO object is attached as an attribute to the NWBFile object) + return nwbfile + + +def read_nwbfile_and_io( + nwbfile_path: Union[str, Path], + method: Optional[Literal["local", "fsspec", "ros3"]] = None, + backend: Optional[Literal["hdf5", "zarr"]] = None, +) -> tuple[NWBFile, HDMFIO]: """ Read an NWB file using the specified (or auto-detected) method and specified (or auto-detected) backend. @@ -83,8 +119,6 @@ def read_nwbfile( backend : "hdf5", "zarr", or None (default) Type of backend used to write the file. The default auto-detects the type of the file. - return_io : bool, default: False - Whether to return the HDMFIO object used to open the file. Returns ------- @@ -94,6 +128,16 @@ def read_nwbfile( Only passed if `return_io` is True. The initialized HDMFIO object used to read the file. """ + nwbfile, io = _read_nwbfile_helper(nwbfile_path=nwbfile_path, method=method, backend=backend) + + return nwbfile, io + + +def _read_nwbfile_helper( + nwbfile_path: Union[str, Path], + method: Optional[Literal["local", "fsspec", "ros3"]] = None, + backend: Optional[Literal["hdf5", "zarr"]] = None, +) -> tuple[NWBFile, HDMFIO]: nwbfile_path = str(nwbfile_path) # If pathlib.Path, cast to str; if already str, no harm done method = method or _get_method(nwbfile_path) @@ -112,13 +156,14 @@ def read_nwbfile( "The ROS3 method was selected, but the URL starts with 's3://'! Please switch to an 'https://' URL." ) - backend = backend or _get_backend(nwbfile_path, method) - if method == "local" and not BACKEND_IO_CLASSES[ # Temporary until .can_read() is able to work on streamed bytes - backend - ].can_read(path=nwbfile_path): - raise IOError(f"The chosen backend ({backend}) is unable to read the file! Please select a different backend.") + chosen_backend = backend or _get_backend(path=nwbfile_path, method=method) + # Temporary until .can_read() is able to work on streamed bytes + if method == "local" and not BACKEND_IO_CLASSES[chosen_backend].can_read(path=nwbfile_path): + raise IOError( + f"The chosen backend ({chosen_backend}) is unable to read the file! Please select a different backend." + ) - # Filter out some of most common warnings that don't really matter with `load_namespaces=True` + # Filter out some common warnings that don't really matter with `load_namespaces=True` filterwarnings(action="ignore", message="No cached namespaces found in .*") filterwarnings(action="ignore", message="Ignoring cached namespace .*") io_kwargs = dict(mode="r", load_namespaces=True) @@ -131,10 +176,7 @@ def read_nwbfile( io_kwargs.update(path=nwbfile_path) if method == "ros3": io_kwargs.update(driver="ros3") - io = BACKEND_IO_CLASSES[backend](**io_kwargs) + io = BACKEND_IO_CLASSES[chosen_backend](**io_kwargs) nwbfile = io.read() - if return_io: - return (nwbfile, io) - else: # Note: do not be concerned about io object closing due to garbage collection here - return nwbfile # (it is attached as an attribute to the NWBFile object) + return nwbfile, io diff --git a/src/nwbinspector/utils/_utils.py b/src/nwbinspector/utils/_utils.py index a1fddd411..6111a188e 100644 --- a/src/nwbinspector/utils/_utils.py +++ b/src/nwbinspector/utils/_utils.py @@ -6,7 +6,7 @@ from functools import lru_cache from importlib import import_module from pathlib import Path -from typing import Optional, TypeVar, Union +from typing import Any, Optional, TypeVar, Union import h5py import numpy as np @@ -52,7 +52,7 @@ def cache_data_selection(data: Union[h5py.Dataset, ArrayLike], selection: Union[ return _cache_data_retrieval_command(data=data, reduced_selection=reduced_selection) -def format_byte_size(byte_size: int, units: str = "SI"): +def format_byte_size(byte_size: int, units: str = "SI") -> str: """ Format a number representing a total number of bytes into a convenient unit. @@ -65,7 +65,7 @@ def format_byte_size(byte_size: int, units: str = "SI"): May be either SI (orders of 1000) or binary (in memory, orders of 1024). The default is SI. """ - num = byte_size + num = float(byte_size) prefixes = ["", "K", "M", "G", "T", "P", "E", "Z"] if units == "SI": order = 1000.0 @@ -79,16 +79,18 @@ def format_byte_size(byte_size: int, units: str = "SI"): if abs(num) < order: return f"{num:3.2f}{prefix}{suffix}" num /= order + return f"{num:.2f}Y{suffix}" -def is_regular_series(series: np.ndarray, tolerance_decimals: int = 9): +def is_regular_series(series: np.ndarray, tolerance_decimals: int = 9) -> bool: """General purpose function for checking if the difference between all consecutive points in a series are equal.""" uniq_diff_ts = np.unique(np.diff(series).round(decimals=tolerance_decimals)) + return len(uniq_diff_ts) == 1 -def is_ascending_series(series: Union[h5py.Dataset, ArrayLike], nelems: Optional[int] = None): +def is_ascending_series(series: Union[h5py.Dataset, ArrayLike], nelems: Optional[int] = None) -> bool: """General purpose function for determining if a series is monotonic increasing.""" if isinstance(series, h5py.Dataset): data = cache_data_selection(data=series, selection=slice(nelems)) @@ -105,7 +107,7 @@ def is_ascending_series(series: Union[h5py.Dataset, ArrayLike], nelems: Optional return np.all(differences >= 0) -def is_dict_in_string(string: str): +def is_dict_in_string(string: str) -> bool: """ Determine if the string value contains an encoded Python dictionary. @@ -114,7 +116,7 @@ def is_dict_in_string(string: str): return any(re.findall(pattern=dict_regex, string=string)) -def is_string_json_loadable(string: str): +def is_string_json_loadable(string: str) -> bool: """ Determine if the serialized dictionary is a JSON object. @@ -127,7 +129,7 @@ def is_string_json_loadable(string: str): return False -def is_module_installed(module_name: str): +def is_module_installed(module_name: str) -> bool: """ Check if the given module is installed on the system. @@ -162,6 +164,7 @@ def get_package_version(name: str) -> version.Version: from pkg_resources import get_distribution package_version = get_distribution(name).version + return version.parse(package_version) @@ -176,7 +179,7 @@ def calculate_number_of_cpu(requested_cpu: int = 1) -> int: The default is 1. """ - total_cpu = os.cpu_count() + total_cpu = os.cpu_count() or 1 # Annotations say os.cpu_count can return None for some reason assert requested_cpu <= total_cpu, f"Requested more CPUs ({requested_cpu}) than are available ({total_cpu})!" assert requested_cpu >= -( total_cpu - 1 @@ -187,11 +190,12 @@ def calculate_number_of_cpu(requested_cpu: int = 1) -> int: return total_cpu + requested_cpu -def get_data_shape(data, strict_no_data_load=False): +def get_data_shape(data: Any, strict_no_data_load: bool = False) -> Optional[tuple[int, ...]]: """ - modified from hdmf.utils.get_data_shape to return shape instead of maxshape Helper function used to determine the shape of the given array. + Modified from hdmf.utils.get_data_shape to return shape instead of maxshape. + In order to determine the shape of nested tuples, lists, and sets, this function recursively inspects elements along the dimensions, assuming that the data has a regular, rectangular shape. In the case of out-of-core iterators, this means that the first item @@ -199,15 +203,22 @@ def get_data_shape(data, strict_no_data_load=False): to enforce that this does not happen, at the cost that we may not be able to determine the shape of the array. - :param data: Array for which we should determine the shape. - :type data: List, numpy.ndarray, DataChunkIterator, any object that support __len__ or .shape. - :param strict_no_data_load: If True and data is an out-of-core iterator, None may be returned. If False (default), - the first element of data may be loaded into memory. - :return: Tuple of ints indicating the size of known dimensions. Dimensions for which the size is unknown - will be set to None. + Parameters + ---------- + data : list, numpy.ndarray, DataChunkIterator, any object that support __len__ or .shape. + Array for which we should determine the shape. + strict_no_data_load : bool + If True and data is an out-of-core iterator, None may be returned. + If False (default), the first element of data may be loaded into memory. + + Returns + ------- + data_shape : tuple[int], optional + Tuple of ints indicating the size of known dimensions. + Dimensions for which the size is unknown will be set to None. """ - def __get_shape_helper(local_data): + def __get_shape_helper(local_data: Any) -> tuple[int, ...]: shape = list() if hasattr(local_data, "__len__"): shape.append(len(local_data)) @@ -220,11 +231,13 @@ def __get_shape_helper(local_data): if hasattr(data, "shape") and data.shape is not None: return data.shape if isinstance(data, dict): - return + return None if hasattr(data, "__len__") and not isinstance(data, (str, bytes)): if not strict_no_data_load or isinstance(data, (list, tuple, set)): return __get_shape_helper(data) + return None + def strtobool(val: str) -> bool: """ diff --git a/tests/test_inspector.py b/tests/test_inspector.py index b3d86c916..2d4859692 100644 --- a/tests/test_inspector.py +++ b/tests/test_inspector.py @@ -3,7 +3,7 @@ from pathlib import Path from shutil import rmtree from tempfile import mkdtemp -from typing import Type +from typing import Type, Union from unittest import TestCase import hdmf_zarr @@ -118,7 +118,7 @@ class TestInspectorOnBackend(TestCase): skip_validate = False # TODO: can be removed once NWBZarrIO validation issues are resolved @staticmethod - def assertFileExists(path: FilePathType): + def assertFileExists(path: Union[str, Path]): path = Path(path) assert path.exists() @@ -882,7 +882,7 @@ def test_dandi_config_in_vitro_injection(): assert messages == [] -def test_dandi_config_in_vitro_injection(): +def test_dandi_config_in_vitro_injection_safe(): """Test the safe subject ID retrieval of the in vitro injection.""" nwbfile = make_minimal_nwbfile() nwbfile.subject = Subject(subject_id=None, description="A detailed description about the in vitro setup.") diff --git a/tests/unit_tests/test_general.py b/tests/unit_tests/test_general.py index 53cadffc4..10ebc6551 100644 --- a/tests/unit_tests/test_general.py +++ b/tests/unit_tests/test_general.py @@ -6,14 +6,14 @@ def test_check_name_slashes_pass(): table = DynamicTable(name="test_name", description="") - assert check_name_slashes(obj=table) is None + assert check_name_slashes(neurodata_object=table) is None def test_check_name_slashes_fail(): """HDMF/PyNWB forbid "/" in the object names. Might need an external file written in MATLAB to test that?""" for x in ["\\"]: table = DynamicTable(name=f"test{x}ing", description="") - assert check_name_slashes(obj=table) == InspectorMessage( + assert check_name_slashes(neurodata_object=table) == InspectorMessage( message="Object name contains slashes.", importance=Importance.CRITICAL, check_function_name="check_name_slashes", @@ -25,12 +25,12 @@ def test_check_name_slashes_fail(): def test_check_description_pass(): table = DynamicTable(name="test", description="testing") - assert check_description(obj=table) is None + assert check_description(neurodata_object=table) is None def test_check_description_fail(): table = DynamicTable(name="test", description="No Description.") - assert check_description(obj=table) == InspectorMessage( + assert check_description(neurodata_object=table) == InspectorMessage( message="Description ('No Description.') is a placeholder.", importance=Importance.BEST_PRACTICE_SUGGESTION, check_function_name="check_description", @@ -42,7 +42,7 @@ def test_check_description_fail(): def test_check_description_missing(): table = DynamicTable(name="test", description=" ") - assert check_description(obj=table) == InspectorMessage( + assert check_description(neurodata_object=table) == InspectorMessage( message="Description is missing.", importance=Importance.BEST_PRACTICE_SUGGESTION, check_function_name="check_description",