diff --git a/CHANGELOG.md b/CHANGELOG.md index a6826f2fa..2e0a75d6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ * Add tool function `nwb_helpers.get_configurable_datasets` and corresponding private methods and dataclass for detecting datasets from an in-memory `NWBFile` that can be wrapped in an H5DataIO before being written to a new or existing file. [PR #549](https://github.com/catalystneuro/neuroconv/pull/549) +* Add tool function `nwb_helpers.get_default_dataset_configurations(nwbfile: NWBFile) -> Dict[Dataset, DatasetConfiguration]` and Pydantic models `BackendConfiguration` for representing top-level backend configuration and `nwb_helpers.DatasetConfiguration` for representing configurable properties of the datasets (chunking & compression options) depending on each backend before writing to disk. + +* Add tool function `nwb_helpers.configure_datasets(nwbfile: NWBFile, dataset_configurations: Dict[Dataset, DatasetConfiguration])` for configuring backend and dataset options for an `NWBFile` before writing to disk. + # v0.4.1 diff --git a/requirements-minimal.txt b/requirements-minimal.txt index 447077ecc..06b339cb4 100644 --- a/requirements-minimal.txt +++ b/requirements-minimal.txt @@ -3,7 +3,7 @@ jsonschema>=3.2.0 PyYAML>=5.4 scipy>=1.4.1 h5py>=2.10.0 -hdmf>=3.4.7 +hdmf @ git+https://github.com/hdmf-dev/hdmf.git@master hdmf_zarr>=0.3.0 pynwb>=2.3.2;python_version>='3.8' psutil>=5.8.0 diff --git a/src/neuroconv/tools/hdmf.py b/src/neuroconv/tools/hdmf.py index 6ad712d51..b82b12e1f 100644 --- a/src/neuroconv/tools/hdmf.py +++ b/src/neuroconv/tools/hdmf.py @@ -1,35 +1,77 @@ """Collection of modifications of HDMF functions that are to be tested/used on this repo until propagation upstream.""" +import math from typing import Tuple import numpy as np from hdmf.data_utils import GenericDataChunkIterator as HDMFGenericDataChunkIterator +from pydantic import Field +from typing_extensions import Annotated class GenericDataChunkIterator(HDMFGenericDataChunkIterator): def _get_default_buffer_shape(self, buffer_gb: float = 1.0) -> Tuple[int]: - num_axes = len(self.maxshape) - chunk_bytes = np.prod(self.chunk_shape) * self.dtype.itemsize + return self.estimate_default_buffer_shape( + buffer_gb=buffer_gb, chunk_shape=self.chunk_shape, maxshape=self.maxshape, dtype=self.dtype + ) + + # TODO: move this to the core iterator in HDMF so it can be easily swapped out as well as run on its own + @staticmethod + def estimate_default_chunk_shape( + chunk_mb: Annotated[float, Field(gt=0.0)], + maxshape: Tuple[int, ...], + dtype: np.dtype, + ) -> Tuple[int, ...]: + """ + Select chunk shape with size in MB less than the threshold of chunk_mb. + + Keeps the dimensional ratios of the original data. + """ + assert chunk_mb > 0.0, f"chunk_mb ({chunk_mb}) must be greater than zero!" + # Eventually, Pydantic validation can handle this validation for us + + n_dims = len(maxshape) + itemsize = dtype.itemsize + chunk_bytes = chunk_mb * 1e6 + + min_maxshape = min(maxshape) + v = tuple(math.floor(maxshape_axis / min_maxshape) for maxshape_axis in maxshape) + prod_v = math.prod(v) + while prod_v * itemsize > chunk_bytes and prod_v != 1: + non_unit_min_v = min(x for x in v if x != 1) + v = tuple(math.floor(x / non_unit_min_v) if x != 1 else x for x in v) + prod_v = math.prod(v) + k = math.floor((chunk_bytes / (prod_v * itemsize)) ** (1 / n_dims)) + return tuple([min(k * x, maxshape[dim]) for dim, x in enumerate(v)]) + + # TODO: move this to the core iterator in HDMF so it can be easily swapped out as well as run on its own + @staticmethod + def estimate_default_buffer_shape( + buffer_gb: Annotated[float, Field(gt=0.0)], + chunk_shape: Tuple[int, ...], + maxshape: Tuple[int, ...], + dtype: np.dtype, + ) -> Tuple[int]: + num_axes = len(maxshape) + chunk_bytes = math.prod(chunk_shape) * dtype.itemsize assert buffer_gb > 0, f"buffer_gb ({buffer_gb}) must be greater than zero!" assert ( buffer_gb >= chunk_bytes / 1e9 ), f"buffer_gb ({buffer_gb}) must be greater than the chunk size ({chunk_bytes / 1e9})!" - assert all( - np.array(self.chunk_shape) > 0 - ), f"Some dimensions of chunk_shape ({self.chunk_shape}) are less than zero!" + assert all(np.array(chunk_shape) > 0), f"Some dimensions of chunk_shape ({chunk_shape}) are less than zero!" - maxshape = np.array(self.maxshape) + maxshape = np.array(maxshape) # Early termination condition - if np.prod(maxshape) * self.dtype.itemsize / 1e9 < buffer_gb: - return tuple(self.maxshape) + if math.prod(maxshape) * dtype.itemsize / 1e9 < buffer_gb: + return tuple(maxshape) buffer_bytes = chunk_bytes - axis_sizes_bytes = maxshape * self.dtype.itemsize - smallest_chunk_axis, second_smallest_chunk_axis, *_ = np.argsort(self.chunk_shape) + axis_sizes_bytes = maxshape * dtype.itemsize + smallest_chunk_axis, second_smallest_chunk_axis, *_ = np.argsort(chunk_shape) target_buffer_bytes = buffer_gb * 1e9 # If the smallest full axis does not fit within the buffer size, form a square along the two smallest axes - sub_square_buffer_shape = np.array(self.chunk_shape) + sub_square_buffer_shape = np.array(chunk_shape) if min(axis_sizes_bytes) > target_buffer_bytes: k1 = np.floor((target_buffer_bytes / chunk_bytes) ** 0.5) for axis in [smallest_chunk_axis, second_smallest_chunk_axis]: @@ -40,32 +82,32 @@ def _get_default_buffer_shape(self, buffer_gb: float = 1.0) -> Tuple[int]: chunk_to_buffer_ratio = buffer_gb * 1e9 / chunk_bytes chunk_scaling_factor = np.floor(chunk_to_buffer_ratio ** (1 / num_axes)) unpadded_buffer_shape = [ - np.clip(a=int(x), a_min=self.chunk_shape[j], a_max=self.maxshape[j]) - for j, x in enumerate(chunk_scaling_factor * np.array(self.chunk_shape)) + np.clip(a=int(x), a_min=chunk_shape[j], a_max=maxshape[j]) + for j, x in enumerate(chunk_scaling_factor * np.array(chunk_shape)) ] - unpadded_buffer_bytes = np.prod(unpadded_buffer_shape) * self.dtype.itemsize + unpadded_buffer_bytes = math.prod(unpadded_buffer_shape) * dtype.itemsize # Method that starts by filling the smallest axis completely or calculates best partial fill - padded_buffer_shape = np.array(self.chunk_shape) - chunks_per_axis = np.ceil(maxshape / self.chunk_shape) + padded_buffer_shape = np.array(chunk_shape) + chunks_per_axis = np.ceil(maxshape / chunk_shape) small_axis_fill_size = chunk_bytes * min(chunks_per_axis) full_axes_used = np.zeros(shape=num_axes, dtype=bool) if small_axis_fill_size <= target_buffer_bytes: buffer_bytes = small_axis_fill_size - padded_buffer_shape[smallest_chunk_axis] = self.maxshape[smallest_chunk_axis] + padded_buffer_shape[smallest_chunk_axis] = maxshape[smallest_chunk_axis] full_axes_used[smallest_chunk_axis] = True for axis, chunks_on_axis in enumerate(chunks_per_axis): if full_axes_used[axis]: # If the smallest axis, skip since already used continue if chunks_on_axis * buffer_bytes <= target_buffer_bytes: # If multiple axes can be used together buffer_bytes *= chunks_on_axis - padded_buffer_shape[axis] = self.maxshape[axis] + padded_buffer_shape[axis] = maxshape[axis] else: # Found an axis that is too large to use with the rest of the buffer; calculate how much can be used k3 = np.floor(target_buffer_bytes / buffer_bytes) padded_buffer_shape[axis] *= k3 break - padded_buffer_bytes = np.prod(padded_buffer_shape) * self.dtype.itemsize + padded_buffer_bytes = math.prod(padded_buffer_shape) * dtype.itemsize if padded_buffer_bytes >= unpadded_buffer_bytes: return tuple(padded_buffer_shape) diff --git a/src/neuroconv/tools/nwb_helpers/__init__.py b/src/neuroconv/tools/nwb_helpers/__init__.py index ca2aad751..34490b0bb 100644 --- a/src/neuroconv/tools/nwb_helpers/__init__.py +++ b/src/neuroconv/tools/nwb_helpers/__init__.py @@ -1,4 +1,18 @@ -from ._dataset_configuration import ConfigurableDataset, get_configurable_datasets +from ._dataset_and_backend_models import ( + BACKEND_TO_CONFIGURATION, + BACKEND_TO_DATASET_CONFIGURATION, + BackendConfiguration, + DatasetConfiguration, + DatasetInfo, + HDF5BackendConfiguration, + HDF5DatasetConfiguration, + ZarrBackendConfiguration, + ZarrDatasetConfiguration, +) +from ._dataset_configuration import ( + get_default_backend_configuration, + get_default_dataset_configurations, +) from ._metadata_and_file_helpers import ( add_device_from_metadata, get_default_nwbfile_metadata, diff --git a/src/neuroconv/tools/nwb_helpers/_dataset_and_backend_models.py b/src/neuroconv/tools/nwb_helpers/_dataset_and_backend_models.py new file mode 100644 index 000000000..789510e2a --- /dev/null +++ b/src/neuroconv/tools/nwb_helpers/_dataset_and_backend_models.py @@ -0,0 +1,204 @@ +"""Collection of helper functions related to configuration of datasets dependent on backend.""" +from typing import Any, Dict, Literal, Tuple, Type, Union + +import h5py +import hdf5plugin +import psutil +import zarr +from hdmf.backends.hdf5 import H5DataIO +from hdmf.container import DataIO +from hdmf_zarr import ZarrDataIO +from nwbinspector.utils import is_module_installed +from pydantic import BaseModel, Field, root_validator + + +class DatasetInfo(BaseModel): + object_id: str + location: str + maxshape: Tuple[int, ...] + dtype: str # Think about how to constrain/specify this more + + class Config: # noqa: D106 + allow_mutation = False + + def __hash__(self): + """To allow instances of this class to be used as keys in dictionaries.""" + return hash((type(self),) + tuple(self.__dict__.values())) + + +class DatasetConfiguration(BaseModel): + """A data model for configruing options about an object that will become a HDF5 or Zarr Dataset in the file.""" + + dataset_info: DatasetInfo + chunk_shape: Tuple[int, ...] + buffer_shape: Tuple[int, ...] + compression_method: Union[str, None] # Backend configurations should specify Literals; None means no compression + compression_options: Union[Dict[str, Any], None] = None + + def __str__(self) -> str: + """Not overriding __repr__ as this is intended to render only when wrapped in print().""" + string = ( + f"{self.object_name} of {self.parent}\n" + + f"{'-' * (len(self.object_name) + 4 + len(self.parent))}\n" + + f" {self.field}\n" + + f" maxshape: {self.maxshape}\n" + + f" dtype: {self.dtype}" + ) + return string + + +_available_hdf5_filters = set(h5py.filters.decode) - set(("shuffle", "fletcher32", "scaleoffset")) +if is_module_installed(module_name="hdf5plugin"): + _available_hdf5_filters = _available_hdf5_filters | set( + (filter_.filter_name for filter_ in hdf5plugin.get_filters()) + ) +AVAILABLE_HDF5_COMPRESSION_METHODS = Literal[tuple(_available_hdf5_filters)] + + +class HDF5DatasetConfiguration(DatasetConfiguration): + """A data model for configruing options about an object that will become a HDF5 Dataset in the file.""" + + compression_method: Union[AVAILABLE_HDF5_COMPRESSION_METHODS, None] = "gzip" + # TODO: actually provide better schematic rendering of options. Only support defaults in GUIDE for now + # Looks like they'll have to be hand-typed however... Can try parsing the google docstrings but no annotation typing + compression_options: Union[Dict[str, Any], None] = None + + +_available_zarr_filters = ( + set(zarr.codec_registry.keys()) + - set( + # These filters do nothing for us, or are things that ought to be implemented at lower HDMF levels + # or indirectly using HDMF data structures + ( + "json2", # no data savings + "pickle", # no data savings + "vlen-utf8", # enforced by HDMF + "vlen-array", # enforced by HDMF + "vlen-bytes", # enforced by HDMF + "adler32", # checksum + "crc32", # checksum + "fixedscaleoffset", # enforced indrectly by HDMF/PyNWB data types + "base64", # unsure what this would ever be used for + "n5_wrapper", # different data format + ) + ) + - set( # Forbidding lossy codecs for now, but they could be allowed in the future with warnings + ("astype", "bitround", "quantize") + ) +) +# TODO: would like to eventually (as separate feature) add an 'auto' method to Zarr +# to harness the wider range of potential methods that are ideal for certain dtypes or structures +# E.g., 'packbits' for boolean (logical) VectorData columns +# | set(("auto",)) +AVAILABLE_ZARR_COMPRESSION_METHODS = Literal[tuple(_available_zarr_filters)] + + +class ZarrDatasetConfiguration(DatasetConfiguration): + """A data model for configruing options about an object that will become a Zarr Dataset in the file.""" + + filter_methods: Union[Tuple[AVAILABLE_ZARR_COMPRESSION_METHODS, ...], None] = None + filter_options: Union[Tuple[Dict[str, Any]], None] = None + compression_method: Union[AVAILABLE_ZARR_COMPRESSION_METHODS, None] = "gzip" # TODO: would like this to be 'auto' + # TODO: actually provide better schematic rendering of options. Only support defaults in GUIDE for now + # Looks like they'll have to be hand-typed however... Can try parsing the google docstrings but no annotation typing + compression_options: Union[Dict[str, Any], None] = None + + @root_validator() + def validate_filter_methods_and_options_match(cls, values: Dict[str, Any]): + filter_methods = values["filter_methods"] + filter_options = values["filter_options"] + + if filter_methods is None and filter_options is not None: + raise ValueError(f"`filter_methods` is `None` but `filter_options` is not ({filter_options})!") + elif filter_methods is None and filter_options is None: + return values + + len_filter_methods = len(filter_methods) + len_filter_options = len(filter_options) + if len_filter_methods != len_filter_options: + raise ValueError( + f"Length mismatch between `filter_methods` ({len_filter_methods} methods specified) and " + f"`filter_options` ({len_filter_options} options found)! These two must match one-to-one." + ) + + return values + + # think about extra validation that msgpack2 compression only ideal for datasets of vlen strings + + +class BackendConfiguration(BaseModel): + """A model for matching collections of DatasetConfigurations specific to the HDF5 backend.""" + + backend: Literal["hdf5", "zarr"] + data_io: Type[DataIO] + dataset_configurations: Dict[str, DatasetConfiguration] # str is location field of DatasetConfiguration + + def __str__(self) -> str: + """Not overriding __repr__ as this is intended to render only when wrapped in print().""" + string = ( + f"Configurable datasets identified using the {self.backend} backend\n" + f"{'-' * (43 + len(self.backend) + 8)}\n" + ) + + for dataset_configuration in self.dataset_configurations.values(): + dataset_info = dataset_configuration.dataset_info + string += ( + f"{dataset_info.location}\n" + f" maxshape : {dataset_info.maxshape}\n" + f" dtype : {dataset_info.dtype}\n\n" + f" chunk shape : {dataset_configuration.chunk_shape}\n" + f" buffer shape : {dataset_configuration.buffer_shape}\n" + f" compression method : {dataset_configuration.compression_method}\n" + f" compression options : {dataset_configuration.compression_options}\n\n\n" + ) + + return string + + +class HDF5BackendConfiguration(BackendConfiguration): + """A model for matching collections of DatasetConfigurations specific to the HDF5 backend.""" + + backend: Literal["hdf5"] = "hdf5" + data_io: Type[H5DataIO] = H5DataIO + dataset_configurations: Dict[str, HDF5DatasetConfiguration] # str is location field of DatasetConfiguration + + +class ZarrBackendConfiguration(BackendConfiguration): + """A model for matching collections of DatasetConfigurations specific to the Zarr backend.""" + + backend: Literal["zarr"] = "zarr" + data_io: Type[ZarrDataIO] = ZarrDataIO + dataset_configurations: Dict[str, ZarrDatasetConfiguration] # str is location field of DatasetConfiguration + number_of_jobs: int = Field( + description="Number of jobs to use in parallel during write.", + ge=-psutil.cpu_count(), # TODO: should we specify logical=False in cpu_count? + le=psutil.cpu_count(), + default=-2, # -2 translates to 'all CPU except for one' + ) + + def __str__(self) -> str: + """Not overriding __repr__ as this is intended to render only when wrapped in print().""" + string = ( + f"Configurable datasets identified using the {self.backend} backend\n" + f"{'-' * (43 + len(self.backend) + 8)}\n" + ) + + for dataset_configuration in self.dataset_configurations.values(): + dataset_info = dataset_configuration.dataset_info + string += ( + f"{dataset_info.location}\n" + f" maxshape : {dataset_info.maxshape}\n" + f" dtype : {dataset_info.dtype}\n\n" + f" chunk shape : {dataset_configuration.chunk_shape}\n" + f" buffer shape : {dataset_configuration.buffer_shape}\n" + f" compression method : {dataset_configuration.compression_method}\n" + f" compression options : {dataset_configuration.compression_options}\n" + f" filter methods : {dataset_configuration.filter_methods}\n" + f" filter options : {dataset_configuration.filter_options}\n\n\n" + ) + + return string + + +BACKEND_TO_DATASET_CONFIGURATION = dict(hdf5=HDF5DatasetConfiguration, zarr=ZarrDatasetConfiguration) +BACKEND_TO_CONFIGURATION = dict(hdf5=HDF5BackendConfiguration, zarr=ZarrBackendConfiguration) diff --git a/src/neuroconv/tools/nwb_helpers/_dataset_configuration.py b/src/neuroconv/tools/nwb_helpers/_dataset_configuration.py index f0dd3e920..272d558a3 100644 --- a/src/neuroconv/tools/nwb_helpers/_dataset_configuration.py +++ b/src/neuroconv/tools/nwb_helpers/_dataset_configuration.py @@ -1,56 +1,40 @@ """Collection of helper functions related to configuration of datasets dependent on backend.""" -from typing import Iterable, Literal, Tuple, Union +from typing import Iterable, Literal, Union import h5py +import numpy as np import zarr -from hdmf.data_utils import DataIO +from hdmf import Container +from hdmf.data_utils import DataChunkIterator, DataIO, GenericDataChunkIterator from hdmf.utils import get_data_shape from hdmf_zarr import NWBZarrIO -from pydantic import BaseModel from pynwb import NWBHDF5IO, NWBFile, TimeSeries from pynwb.base import DynamicTable - -class ConfigurableDataset(BaseModel): - """A data model for summarizing information about an object that will become a HDF5 or Zarr Dataset in the file.""" - - object_id: str - object_name: str - parent: str - field: Literal["data", "timestamps"] - maxshape: Tuple[int, ...] - dtype: str # Think about how to constrain/specify this more - - def __str__(self) -> str: - """Not overriding __repr__ as this is intended to render only when wrapped in print().""" - string = ( - f"{self.object_name} of {self.parent}\n" - + f"{'-' * (len(self.object_name) + 4 + len(self.parent))}\n" - + f" {self.field}\n" - + f" maxshape: {self.maxshape}\n" - + f" dtype: {self.dtype}" - ) - return string - - -def _get_dataset_metadata(neurodata_object: Union[TimeSeries, DynamicTable], field_name: str) -> ConfigurableDataset: - """Fill in the Dataset model with as many values as can be automatically detected or inferred.""" - field_value = getattr(neurodata_object, field_name) - if field_value is not None and not isinstance(field_value, DataIO): - return ConfigurableDataset( - object_id=neurodata_object.object_id, - object_name=neurodata_object.name, - parent=neurodata_object.get_ancestor().name, - field=field_name, - maxshape=get_data_shape(data=field_value), - # think on cases that don't have a dtype attr - dtype=str(getattr(field_value, "dtype", "unknown")), - ) - - -def _value_already_written_to_file( - value: Union[h5py.Dataset, zarr.Array], - backend_type: Literal["hdf5", "zarr"], +from ._dataset_and_backend_models import ( + BACKEND_TO_CONFIGURATION, + BACKEND_TO_DATASET_CONFIGURATION, + DatasetConfiguration, + DatasetInfo, + HDF5BackendConfiguration, + HDF5DatasetConfiguration, + ZarrBackendConfiguration, + ZarrDatasetConfiguration, +) +from ..hdmf import SliceableDataChunkIterator + + +def _get_mode(io: Union[NWBHDF5IO, NWBZarrIO]) -> str: + """NWBHDF5IO and NWBZarrIO have different ways of storing the mode they used on a path.""" + if isinstance(io, NWBHDF5IO): + return io.mode + elif isinstance(io, NWBZarrIO): + return io._ZarrIO__mode + + +def _is_value_already_written_to_file( + candidate_dataset: Union[h5py.Dataset, zarr.Array], + backend: Literal["hdf5", "zarr"], existing_file: Union[h5py.File, zarr.Group, None], ) -> bool: """ @@ -59,17 +43,85 @@ def _value_already_written_to_file( This object should then be skipped by the `get_io_datasets` function when working in append mode. """ return ( - isinstance(value, h5py.Dataset) # If the source data is an HDF5 Dataset - and backend_type == "hdf5" # If working in append mode - and value.file == existing_file # If the source HDF5 Dataset is the appending NWBFile + isinstance(candidate_dataset, h5py.Dataset) # If the source data is an HDF5 Dataset + and backend == "hdf5" # If working in append mode + and candidate_dataset.file == existing_file # If the source HDF5 Dataset is the appending NWBFile ) or ( - isinstance(value, zarr.Array) # If the source data is an Zarr Array - and backend_type == "zarr" # If working in append mode - and value.store == existing_file # If the source Zarr 'file' is the appending NWBFile + isinstance(candidate_dataset, zarr.Array) # If the source data is an Zarr Array + and backend == "zarr" # If working in append mode + and candidate_dataset.store == existing_file # If the source Zarr 'file' is the appending NWBFile + ) + + +def _parse_location_in_memory_nwbfile(current_location: str, neurodata_object: Container) -> str: + parent = neurodata_object.parent + if isinstance(parent, NWBFile): + # Items in defined top-level places like acquisition, intervals, etc. do not act as 'containers' + # in the .parent sense; ask if object is in their in-memory dictionaries instead + for outer_field_name, outer_field_value in parent.fields.items(): + if isinstance(outer_field_value, dict) and neurodata_object.name in outer_field_value: + return outer_field_name + "/" + neurodata_object.name + "/" + current_location + return neurodata_object.name + "/" + current_location + return _parse_location_in_memory_nwbfile( + current_location=neurodata_object.name + "/" + current_location, neurodata_object=parent ) -def get_configurable_datasets(nwbfile: NWBFile) -> Iterable[ConfigurableDataset]: +def _get_dataset_metadata( + neurodata_object: Union[TimeSeries, DynamicTable], field_name: str, backend: Literal["hdf5", "zarr"] +) -> Union[HDF5DatasetConfiguration, ZarrDatasetConfiguration]: + """Fill in the Dataset model with as many values as can be automatically detected or inferred.""" + DatasetConfigurationClass = BACKEND_TO_DATASET_CONFIGURATION[backend] + + candidate_dataset = getattr(neurodata_object, field_name) + # For now, skip over datasets already wrapped in DataIO + # Could maybe eventually support modifying chunks in place + # But setting buffer shape only possible if iterator was wrapped first + if not isinstance(candidate_dataset, DataIO): + # DataChunkIterator has best generic dtype inference, though logic is hard to peel out of it + # And it can fail in rare cases but not essential to our default configuration + try: + dtype = str(DataChunkIterator(candidate_dataset).dtype) # string cast to be JSON friendly + except Exception as exception: + if str(exception) != "Data type could not be determined. Please specify dtype in DataChunkIterator init.": + raise exception + else: + dtype = "unknown" + + maxshape = get_data_shape(data=candidate_dataset) + + if isinstance(candidate_dataset, GenericDataChunkIterator): + chunk_shape = candidate_dataset.chunk_shape + buffer_shape = candidate_dataset.buffer_shape + elif dtype != "unknown": + # TODO: eventually replace this with staticmethods on hdmf.data_utils.GenericDataChunkIterator + chunk_shape = SliceableDataChunkIterator.estimate_default_chunk_shape( + chunk_mb=10.0, maxshape=maxshape, dtype=np.dtype(dtype) + ) + buffer_shape = SliceableDataChunkIterator.estimate_default_buffer_shape( + buffer_gb=0.5, chunk_shape=chunk_shape, maxshape=maxshape, dtype=np.dtype(dtype) + ) + else: + pass # TODO: think on this; perhaps zarr's standalone estimator? + + dataset_info = DatasetInfo( + object_id=neurodata_object.object_id, + object_name=neurodata_object.name, + location=_parse_location_in_memory_nwbfile(current_location=field_name, neurodata_object=neurodata_object), + field=field_name, + maxshape=maxshape, + dtype=dtype, + ) + dataset_configuration = DatasetConfigurationClass( + dataset_info=dataset_info, chunk_shape=chunk_shape, buffer_shape=buffer_shape + ) + return dataset_configuration + + +def get_default_dataset_configurations( + nwbfile: NWBFile, + backend: Union[None, Literal["hdf5", "zarr"]] = None, # None for auto-detect from append mode, otherwise required +) -> Iterable[DatasetConfiguration]: """ Method for automatically detecting all objects in the file that could be wrapped in a DataIO. @@ -77,40 +129,85 @@ def get_configurable_datasets(nwbfile: NWBFile) -> Iterable[ConfigurableDataset] ---------- nwbfile : pynwb.NWBFile An in-memory NWBFile object, either generated from the base class or read from an existing file of any backend. + backend : "hdf5" or "zarr" + Which backend format type you would like to use in configuring each datasets compression methods and options. Yields ------ - Dataset + DatasetConfiguration A summary of each detected object that can be wrapped in a DataIO. """ - backend_type = None # Used for filtering out datasets that have already been written to disk when appending + if backend is None and nwbfile.read_io is None: + raise ValueError( + "Keyword argument `backend` (either 'hdf5' or 'zarr') must be specified if the `nwbfile` was not " + "read from an existing file!" + ) + if backend is None and nwbfile.read_io is not None and nwbfile.read_io.mode not in ("r+", "a"): + raise ValueError( + "Keyword argument `backend` (either 'hdf5' or 'zarr') must be specified if the `nwbfile` is being appended." + ) + + detected_backend = None existing_file = None - if isinstance(nwbfile.read_io, NWBHDF5IO): - backend_type = "hdf5" + if isinstance(nwbfile.read_io, NWBHDF5IO) and _get_mode(io=nwbfile.read_io) in ("r+", "a"): + detected_backend = "hdf5" existing_file = nwbfile.read_io._file - elif isinstance(nwbfile.read_io, NWBZarrIO): - backend_type = "zarr" + elif isinstance(nwbfile.read_io, NWBZarrIO) and _get_mode(io=nwbfile.read_io) in ("r+", "a"): + detected_backend = "zarr" existing_file = nwbfile.read_io.file.store + backend = backend or detected_backend + + if detected_backend is not None and detected_backend != backend: + raise ValueError( + f"Detected backend '{detected_backend}' for appending file, but specified `backend` " + f"({backend}) does not match! Set `backend=None` or remove the keyword argument to allow it to auto-detect." + ) - for _, neurodata_object in nwbfile.objects.items(): - # TODO: edge case of ImageSeries with external file mode? + for neurodata_object in nwbfile.objects.values(): if isinstance(neurodata_object, TimeSeries): + time_series = neurodata_object # for readability + for field_name in ("data", "timestamps"): - if field_name not in neurodata_object.fields: # timestamps is optional + if field_name not in time_series.fields: # timestamps is optional continue - if _value_already_written_to_file( - value=getattr(neurodata_object, field_name), - backend_type=backend_type, - existing_file=existing_file, + + candidate_dataset = getattr(time_series, field_name) + if _is_value_already_written_to_file( + candidate_dataset=candidate_dataset, backend=backend, existing_file=existing_file ): continue # skip - yield _get_dataset_metadata(neurodata_object=neurodata_object, field_name=field_name) + # Edge case of in-memory ImageSeries with external mode; data is in fields and is empty array + if isinstance(candidate_dataset, np.ndarray) and not np.any(candidate_dataset): + continue # skip + + yield _get_dataset_metadata(neurodata_object=time_series, field_name=field_name, backend=backend) elif isinstance(neurodata_object, DynamicTable): - for column_name in getattr(neurodata_object, "colnames"): - if _value_already_written_to_file( - value=getattr(neurodata_object, column_name), backend_type=backend_type, existing_file=existing_file + dynamic_table = neurodata_object # for readability + + for column_name in dynamic_table.colnames: + candidate_dataset = dynamic_table[column_name].data # VectorData object + if _is_value_already_written_to_file( + candidate_dataset=candidate_dataset, backend=backend, existing_file=existing_file ): continue # skip - yield _get_dataset_metadata(neurodata_object=neurodata_object[column_name], field_name="data") + yield _get_dataset_metadata( + neurodata_object=dynamic_table[column_name], field_name="data", backend=backend + ) + + +def get_default_backend_configuration( + nwbfile: NWBFile, backend: Literal["hdf5", "zarr"] +) -> Union[HDF5BackendConfiguration, ZarrBackendConfiguration]: + """Fill a default backend configuration to serve as a starting point for further customization.""" + BackendConfigurationClass = BACKEND_TO_CONFIGURATION[backend] + + default_dataset_configurations = get_default_dataset_configurations(nwbfile=nwbfile, backend=backend) + dataset_configurations = { + default_dataset_configuration.dataset_info.location: default_dataset_configuration + for default_dataset_configuration in default_dataset_configurations + } + + backend_configuration = BackendConfigurationClass(dataset_configurations=dataset_configurations) + return backend_configuration diff --git a/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_configurable_dataset.py b/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_configurable_dataset.py index 90d60eb2f..93572ff3a 100644 --- a/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_configurable_dataset.py +++ b/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_configurable_dataset.py @@ -4,44 +4,43 @@ from neuroconv.tools.nwb_helpers import ConfigurableDataset - -def test_configurable_dataset_print(): - """Test the printout display of a Dataset modellooks nice.""" - test_dataset = ConfigurableDataset( - object_id="abc123", - object_name="TestObject", - parent="TestParent", - field="data", - maxshape=(2, 4), - dtype="int16", - ) - - with patch("sys.stdout", new=StringIO()) as out: - print(test_dataset) - - expected_print = """TestObject of TestParent ------------------------- - data - maxshape: (2, 4) - dtype: int16 -""" - assert out.getvalue() == expected_print - - -def test_configurable_dataset_repr(): - """Test the programmatic repr of a Dataset model is more dataclass-like.""" - test_dataset = ConfigurableDataset( - object_id="abc123", - object_name="TestObject", - parent="TestParent", - field="data", - maxshape=(2, 4), - dtype="int16", - ) - - # Important to keep the `repr` unmodified for appearance inside lists of Datasets - expected_repr = ( - "ConfigurableDataset(object_id='abc123', object_name='TestObject', parent='TestParent', " - "field='data', maxshape=(2, 4), dtype='int16')" - ) - assert repr(test_dataset) == expected_repr +# def test_configurable_dataset_print(): +# """Test the printout display of a Dataset modellooks nice.""" +# test_dataset = ConfigurableDataset( +# object_id="abc123", +# object_name="TestObject", +# parent="TestParent", +# field="data", +# maxshape=(2, 4), +# dtype="int16", +# ) + +# with patch("sys.stdout", new=StringIO()) as out: +# print(test_dataset) + +# expected_print = """TestObject of TestParent +# ------------------------ +# data +# maxshape: (2, 4) +# dtype: int16 +# """ +# assert out.getvalue() == expected_print + + +# def test_configurable_dataset_repr(): +# """Test the programmatic repr of a Dataset model is more dataclass-like.""" +# test_dataset = ConfigurableDataset( +# object_id="abc123", +# object_name="TestObject", +# parent="TestParent", +# field="data", +# maxshape=(2, 4), +# dtype="int16", +# ) + +# # Important to keep the `repr` unmodified for appearance inside lists of Datasets +# expected_repr = ( +# "ConfigurableDataset(object_id='abc123', object_name='TestObject', parent='TestParent', " +# "field='data', maxshape=(2, 4), dtype='int16')" +# ) +# assert repr(test_dataset) == expected_repr diff --git a/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_configurable_datasets.py b/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_configurable_datasets.py deleted file mode 100644 index 137889f7b..000000000 --- a/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_configurable_datasets.py +++ /dev/null @@ -1,135 +0,0 @@ -from pathlib import Path - -import numpy as np -import pytest -from hdmf_zarr import NWBZarrIO -from pynwb import NWBHDF5IO, NWBFile -from pynwb.base import DynamicTable -from pynwb.testing.mock.base import mock_TimeSeries -from pynwb.testing.mock.file import mock_NWBFile - -from neuroconv.tools.nwb_helpers import ConfigurableDataset, get_configurable_datasets - - -def generate_1d_array() -> NWBFile: - array = np.array([[0.1, 0.2, 0.3]]) - return array - - -def generate_2d_array() -> NWBFile: - array = np.array([[1, 2, 3], [4, 5, 6]]) - return array - - -def generate_nwbfile_with_ConfigurableDatasets() -> NWBFile: - nwbfile = mock_NWBFile() - array = generate_2d_array() - time_series = mock_TimeSeries(data=array) - nwbfile.add_acquisition(time_series) - return nwbfile - - -@pytest.fixture(scope="session") -def hdf5_nwbfile_path(tmpdir_factory): - nwbfile_path = tmpdir_factory.mktemp("data").join("test_hdf5_nwbfile_with_configurable_datasets.nwb.h5") - if not Path(nwbfile_path).exists(): - nwbfile = generate_nwbfile_with_ConfigurableDatasets() - with NWBHDF5IO(path=str(nwbfile_path), mode="w") as io: - io.write(nwbfile) - return str(nwbfile_path) - - -@pytest.fixture(scope="session") -def zarr_nwbfile_path(tmpdir_factory): - nwbfile_path = tmpdir_factory.mktemp("data").join("test_zarr_nwbfile_with_configurable_datasets.nwb.zarr") - if not Path(nwbfile_path).exists(): - nwbfile = generate_nwbfile_with_ConfigurableDatasets() - with NWBZarrIO(path=str(nwbfile_path), mode="w") as io: - io.write(nwbfile) - return str(nwbfile_path) - - -def test_simple_time_series(): - array = generate_2d_array() - - nwbfile = mock_NWBFile() - time_series = mock_TimeSeries(name="TimeSeries", data=array) - nwbfile.add_acquisition(time_series) - - results = list(get_configurable_datasets(nwbfile=nwbfile)) - - assert len(results) == 1 - - result = results[0] - assert isinstance(result, ConfigurableDataset) - assert result.object_name == "TimeSeries" - assert result.field == "data" - assert result.maxshape == array.shape - assert result.dtype == array.dtype - - -def test_simple_dynamic_table(): - array = generate_1d_array() - - nwbfile = mock_NWBFile() - column_length = array.shape[1] - dynamic_table = DynamicTable( - name="DynamicTable", - description="", - id=list(range(column_length)), # Need to include ID since the data of the column is not wrapped in an IO - ) - dynamic_table.add_column(name="TestColumn", description="", data=array.squeeze()) - nwbfile.add_acquisition(dynamic_table) - - results = list(get_configurable_datasets(nwbfile=nwbfile)) - - assert len(results) == 1 - - result = results[0] - assert isinstance(result, ConfigurableDataset) - assert result.object_name == "TestColumn" - assert result.field == "data" - assert result.maxshape == (column_length,) - assert result.dtype == str(array.dtype) - - -def test_simple_on_appended_hdf5_file(hdf5_nwbfile_path): - array = generate_2d_array() - - with NWBHDF5IO(path=hdf5_nwbfile_path, mode="a") as io: - nwbfile = io.read() - array = generate_2d_array() - new_time_series = mock_TimeSeries(name="NewTimeSeries", data=array) - nwbfile.add_acquisition(new_time_series) - - results = list(get_configurable_datasets(nwbfile=nwbfile)) - - assert len(results) == 1 - - result = results[0] - assert isinstance(result, ConfigurableDataset) - assert result.object_name == "NewTimeSeries" - assert result.field == "data" - assert result.maxshape == array.shape - assert result.dtype == str(array.dtype) # TODO: add tests for if source specification was np.dtype et al. - - -def test_simple_on_appended_zarr_file(zarr_nwbfile_path): - array = generate_2d_array() - - with NWBZarrIO(path=zarr_nwbfile_path, mode="a") as io: - nwbfile = io.read() - array = generate_2d_array() - new_time_series = mock_TimeSeries(name="NewTimeSeries", data=array) - nwbfile.add_acquisition(new_time_series) - - results = list(get_configurable_datasets(nwbfile=nwbfile)) - - assert len(results) == 1 - - result = results[0] - assert isinstance(result, ConfigurableDataset) - assert result.object_name == "NewTimeSeries" - assert result.field == "data" - assert result.maxshape == array.shape - assert result.dtype == str(array.dtype) # TODO: add tests for if source specification was np.dtype et al. diff --git a/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_default_backend_configuration.py b/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_default_backend_configuration.py new file mode 100644 index 000000000..2c15cf94c --- /dev/null +++ b/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_default_backend_configuration.py @@ -0,0 +1,231 @@ +"""Integration tests for `get_default_backend_configuration`.""" +from io import StringIO +from pathlib import Path +from unittest.mock import patch + +import numpy as np +import pytest +from hdmf_zarr import NWBZarrIO +from pynwb import NWBHDF5IO, NWBFile +from pynwb.testing.mock.base import mock_TimeSeries +from pynwb.testing.mock.file import mock_NWBFile + +from neuroconv.tools.nwb_helpers import ( + HDF5BackendConfiguration, + ZarrBackendConfiguration, + get_default_backend_configuration, + get_module, +) + + +def generate_complex_nwbfile() -> NWBFile: + nwbfile = mock_NWBFile() + + raw_array = np.array([[1, 2, 3], [4, 5, 6]]) + raw_time_series = mock_TimeSeries(name="RawTimeSeries", data=raw_array) + nwbfile.add_acquisition(raw_time_series) + + number_of_trials = 10 + for start_time, stop_time in zip( + np.linspace(start=0.0, stop=10.0, num=number_of_trials), np.linspace(start=1.0, stop=11.0, num=number_of_trials) + ): + nwbfile.add_trial(start_time=start_time, stop_time=stop_time) + + ecephys_module = get_module(nwbfile=nwbfile, name="ecephys") + processed_array = np.array([[7.0, 8.0], [9.0, 10.0], [11.0, 12.0], [13.0, 14.0]]) + processed_time_series = mock_TimeSeries(name="ProcessedTimeSeries", data=processed_array) + ecephys_module.add(processed_time_series) + + return nwbfile + + +@pytest.fixture(scope="session") +def hdf5_nwbfile_path(tmpdir_factory): + nwbfile_path = tmpdir_factory.mktemp("data").join("test_default_backend_configuration_hdf5_nwbfile.nwb.h5") + if not Path(nwbfile_path).exists(): + nwbfile = generate_complex_nwbfile() + with NWBHDF5IO(path=str(nwbfile_path), mode="w") as io: + io.write(nwbfile) + return str(nwbfile_path) + + +@pytest.fixture(scope="session") +def zarr_nwbfile_path(tmpdir_factory): + nwbfile_path = tmpdir_factory.mktemp("data").join("test_default_backend_configuration_hdf5_nwbfile.nwb.zarr") + if not Path(nwbfile_path).exists(): + nwbfile = generate_complex_nwbfile() + with NWBZarrIO(path=str(nwbfile_path), mode="w") as io: + io.write(nwbfile) + return str(nwbfile_path) + + +def test_complex_hdf5(hdf5_nwbfile_path): + with NWBHDF5IO(path=hdf5_nwbfile_path, mode="a") as io: + nwbfile = io.read() + + raw_array = np.array([[11, 21, 31], [41, 51, 61]]) + raw_time_series = mock_TimeSeries(name="NewRawTimeSeries", data=raw_array) + nwbfile.add_acquisition(raw_time_series) + + number_of_epochs = 5 + for start_time, stop_time in zip( + np.linspace(start=0.0, stop=10.0, num=number_of_epochs), + np.linspace(start=1.0, stop=11.0, num=number_of_epochs), + ): + nwbfile.add_epoch(start_time=start_time, stop_time=stop_time) + + ecephys_module = get_module(nwbfile=nwbfile, name="ecephys") + processed_array = np.array([[7.1, 8.1], [9.1, 10.1], [11.1, 12.1], [13.1, 14.1]]) + processed_time_series = mock_TimeSeries(name="NewProcessedTimeSeries", data=processed_array) + ecephys_module.add(processed_time_series) + + backend_configuration = get_default_backend_configuration(nwbfile=nwbfile, backend="hdf5") + + assert isinstance(backend_configuration, HDF5BackendConfiguration) + + dataset_configurations = backend_configuration.dataset_configurations + assert len(dataset_configurations) == 4 + assert "acquisition/NewRawTimeSeries/data" in dataset_configurations + assert "epochs/start_time/data" in dataset_configurations + assert "epochs/stop_time/data" in dataset_configurations + assert "processing/ecephys/NewProcessedTimeSeries/data" in dataset_configurations + + # Best summary test of expected output is the printout + with patch("sys.stdout", new=StringIO()) as stdout: + print(backend_configuration) + + expected_print = """Configurable datasets identified using the hdf5 backend +------------------------------------------------------- +epochs/start_time/data + maxshape : (5,) + dtype : float64 + + chunk shape : (5,) + buffer shape : (5,) + compression method : gzip + compression options : None + + +epochs/stop_time/data + maxshape : (5,) + dtype : float64 + + chunk shape : (5,) + buffer shape : (5,) + compression method : gzip + compression options : None + + +acquisition/NewRawTimeSeries/data + maxshape : (2, 3) + dtype : int32 + + chunk shape : (2, 3) + buffer shape : (2, 3) + compression method : gzip + compression options : None + + +processing/ecephys/NewProcessedTimeSeries/data + maxshape : (4, 2) + dtype : float64 + + chunk shape : (4, 2) + buffer shape : (4, 2) + compression method : gzip + compression options : None + + + +""" + assert stdout.getvalue() == expected_print + + +def test_complex_zarr(zarr_nwbfile_path): + with NWBZarrIO(path=zarr_nwbfile_path, mode="a") as io: + nwbfile = io.read() + + raw_array = np.array([[11, 21, 31], [41, 51, 61]]) + raw_time_series = mock_TimeSeries(name="NewRawTimeSeries", data=raw_array) + nwbfile.add_acquisition(raw_time_series) + + number_of_epochs = 5 + for start_time, stop_time in zip( + np.linspace(start=0.0, stop=10.0, num=number_of_epochs), + np.linspace(start=1.0, stop=11.0, num=number_of_epochs), + ): + nwbfile.add_epoch(start_time=start_time, stop_time=stop_time) + + ecephys_module = get_module(nwbfile=nwbfile, name="ecephys") + processed_array = np.array([[7.1, 8.1], [9.1, 10.1], [11.1, 12.1], [13.1, 14.1]]) + processed_time_series = mock_TimeSeries(name="NewProcessedTimeSeries", data=processed_array) + ecephys_module.add(processed_time_series) + + backend_configuration = get_default_backend_configuration(nwbfile=nwbfile, backend="zarr") + + assert isinstance(backend_configuration, ZarrBackendConfiguration) + + dataset_configurations = backend_configuration.dataset_configurations + assert len(dataset_configurations) == 4 + assert "acquisition/NewRawTimeSeries/data" in dataset_configurations + assert "epochs/start_time/data" in dataset_configurations + assert "epochs/stop_time/data" in dataset_configurations + assert "processing/ecephys/NewProcessedTimeSeries/data" in dataset_configurations + + # Best summary test of expected output is the printout + with patch("sys.stdout", new=StringIO()) as stdout: + print(backend_configuration) + + expected_print = """Configurable datasets identified using the zarr backend +------------------------------------------------------- +epochs/start_time/data + maxshape : (5,) + dtype : float64 + + chunk shape : (5,) + buffer shape : (5,) + compression method : gzip + compression options : None + filter methods : None + filter options : None + + +epochs/stop_time/data + maxshape : (5,) + dtype : float64 + + chunk shape : (5,) + buffer shape : (5,) + compression method : gzip + compression options : None + filter methods : None + filter options : None + + +acquisition/NewRawTimeSeries/data + maxshape : (2, 3) + dtype : int32 + + chunk shape : (2, 3) + buffer shape : (2, 3) + compression method : gzip + compression options : None + filter methods : None + filter options : None + + +processing/ecephys/NewProcessedTimeSeries/data + maxshape : (4, 2) + dtype : float64 + + chunk shape : (4, 2) + buffer shape : (4, 2) + compression method : gzip + compression options : None + filter methods : None + filter options : None + + + +""" + assert stdout.getvalue() == expected_print diff --git a/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_default_dataset_configurations.py b/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_default_dataset_configurations.py new file mode 100644 index 000000000..be0c6f0d8 --- /dev/null +++ b/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_default_dataset_configurations.py @@ -0,0 +1,349 @@ +"""Unit tests for `get_default_dataset_configurations`.""" +import numpy as np +from hdmf.common import VectorData +from hdmf.data_utils import DataChunkIterator +from pynwb.base import DynamicTable +from pynwb.image import ImageSeries +from pynwb.testing.mock.base import mock_TimeSeries +from pynwb.testing.mock.file import mock_NWBFile + +from neuroconv.tools.hdmf import SliceableDataChunkIterator +from neuroconv.tools.nwb_helpers import ( + HDF5DatasetConfiguration, + ZarrDatasetConfiguration, + get_default_dataset_configurations, +) + + +def test_unwrapped_time_series_hdf5(): + array = np.array([[1, 2, 3], [4, 5, 6]]) + + nwbfile = mock_NWBFile() + time_series = mock_TimeSeries(name="TestTimeSeries", data=array) + nwbfile.add_acquisition(time_series) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="hdf5")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, HDF5DatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == time_series.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestTimeSeries/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + + +def test_unwrapped_time_series_zarr(): + array = np.array([[1, 2, 3], [4, 5, 6]]) + + nwbfile = mock_NWBFile() + time_series = mock_TimeSeries(name="TestTimeSeries", data=array) + nwbfile.add_acquisition(time_series) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="zarr")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, ZarrDatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == time_series.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestTimeSeries/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + assert dataset_configuration.filter_methods is None + assert dataset_configuration.filter_options is None + + +def test_generic_iterator_wrapped_time_series_hdf5(): + array = np.array([[1, 2, 3], [4, 5, 6]]) + + nwbfile = mock_NWBFile() + time_series = mock_TimeSeries(name="TestTimeSeries", data=SliceableDataChunkIterator(data=array)) + nwbfile.add_acquisition(time_series) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="hdf5")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, HDF5DatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == time_series.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestTimeSeries/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + + +def test_classic_iterator_wrapped_simple_time_series_zarr(): + array = np.array([[1, 2, 3], [4, 5, 6]]) + + nwbfile = mock_NWBFile() + time_series = mock_TimeSeries(name="TestTimeSeries", data=DataChunkIterator(data=array)) + nwbfile.add_acquisition(time_series) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="zarr")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, ZarrDatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == time_series.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestTimeSeries/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + assert dataset_configuration.filter_methods is None + assert dataset_configuration.filter_options is None + + +def test_classic_iterator_wrapped_time_series_hdf5(): + array = np.array([[1, 2, 3], [4, 5, 6]]) + + nwbfile = mock_NWBFile() + time_series = mock_TimeSeries(name="TestTimeSeries", data=DataChunkIterator(data=array)) + nwbfile.add_acquisition(time_series) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="hdf5")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, HDF5DatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == time_series.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestTimeSeries/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + + +def test_generic_iterator_wrapped_simple_time_series_zarr(): + array = np.array([[1, 2, 3], [4, 5, 6]]) + + nwbfile = mock_NWBFile() + time_series = mock_TimeSeries(name="TestTimeSeries", data=SliceableDataChunkIterator(data=array)) + nwbfile.add_acquisition(time_series) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="zarr")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, ZarrDatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == time_series.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestTimeSeries/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + assert dataset_configuration.filter_methods is None + assert dataset_configuration.filter_options is None + + +def test_external_image_series_hdf5(): + nwbfile = mock_NWBFile() + image_series = ImageSeries(name="TestImageSeries", external_file=[""], rate=1.0) + nwbfile.add_acquisition(image_series) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="hdf5")) + + assert len(dataset_configurations) == 0 + + +def test_external_image_series_zarr(): + nwbfile = mock_NWBFile() + image_series = ImageSeries(name="TestImageSeries", external_file=[""], rate=1.0) + nwbfile.add_acquisition(image_series) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="zarr")) + + assert len(dataset_configurations) == 0 + + +def test_unwrapped_dynamic_table_hdf5(): + array = np.array([0.1, 0.2, 0.3]) + + nwbfile = mock_NWBFile() + column = VectorData(name="TestColumn", description="", data=array.squeeze()) + dynamic_table = DynamicTable(name="TestDynamicTable", description="", columns=[column]) + nwbfile.add_acquisition(dynamic_table) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="hdf5")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, HDF5DatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == column.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestDynamicTable/TestColumn/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + + +def test_unwrapped_dynamic_table_zarr(): + array = np.array([0.1, 0.2, 0.3]) + + nwbfile = mock_NWBFile() + column = VectorData(name="TestColumn", description="", data=array.squeeze()) + dynamic_table = DynamicTable(name="TestDynamicTable", description="", columns=[column]) + nwbfile.add_acquisition(dynamic_table) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="zarr")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, ZarrDatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == column.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestDynamicTable/TestColumn/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + assert dataset_configuration.filter_methods is None + assert dataset_configuration.filter_options is None + + +def test_generic_iterator_wrapped_dynamic_table_hdf5(): + array = np.array([0.1, 0.2, 0.3]) + + nwbfile = mock_NWBFile() + column = VectorData(name="TestColumn", description="", data=SliceableDataChunkIterator(data=array.squeeze())) + dynamic_table = DynamicTable( + name="TestDynamicTable", + description="", + id=list(range(array.shape[0])), # Need to include ID since the data of the column is not wrapped in an IO + columns=[column], + ) + nwbfile.add_acquisition(dynamic_table) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="hdf5")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, HDF5DatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == column.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestDynamicTable/TestColumn/data" + assert dataset_configuration.dataset_info.maxshape == (array.shape[0],) + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + + +def test_generic_iterator_wrapped_dynamic_table_zarr(): + array = np.array([0.1, 0.2, 0.3]) + + nwbfile = mock_NWBFile() + column = VectorData(name="TestColumn", description="", data=SliceableDataChunkIterator(data=array.squeeze())) + dynamic_table = DynamicTable( + name="TestDynamicTable", + description="", + id=list(range(array.shape[0])), # Need to include ID since the data of the column is not wrapped in an IO + columns=[column], + ) + nwbfile.add_acquisition(dynamic_table) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="zarr")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, ZarrDatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == column.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestDynamicTable/TestColumn/data" + assert dataset_configuration.dataset_info.maxshape == (array.shape[0],) + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + assert dataset_configuration.filter_methods is None + assert dataset_configuration.filter_options is None + + +def test_classic_iterator_wrapped_dynamic_table_hdf5(): + array = np.array([0.1, 0.2, 0.3]) + + nwbfile = mock_NWBFile() + column = VectorData(name="TestColumn", description="", data=DataChunkIterator(data=array.squeeze())) + dynamic_table = DynamicTable( + name="TestDynamicTable", + description="", + id=list(range(array.shape[0])), # Need to include ID since the data of the column is not wrapped in an IO + columns=[column], + ) + nwbfile.add_acquisition(dynamic_table) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="hdf5")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, HDF5DatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == column.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestDynamicTable/TestColumn/data" + assert dataset_configuration.dataset_info.maxshape == (array.shape[0],) + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + + +def test_classic_iterator_wrapped_dynamic_table_zarr(): + array = np.array([0.1, 0.2, 0.3]) + + nwbfile = mock_NWBFile() + column = VectorData(name="TestColumn", description="", data=DataChunkIterator(data=array.squeeze())) + dynamic_table = DynamicTable( + name="TestDynamicTable", + description="", + id=list(range(array.shape[0])), # Need to include ID since the data of the column is not wrapped in an IO + columns=[column], + ) + nwbfile.add_acquisition(dynamic_table) + + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="zarr")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, ZarrDatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == column.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestDynamicTable/TestColumn/data" + assert dataset_configuration.dataset_info.maxshape == (array.shape[0],) + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + assert dataset_configuration.filter_methods is None + assert dataset_configuration.filter_options is None diff --git a/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_default_dataset_configurations_appended_files.py b/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_default_dataset_configurations_appended_files.py new file mode 100644 index 000000000..bac169bc2 --- /dev/null +++ b/tests/test_minimal/test_tools/test_backend_and_dataset_configuration/test_get_default_dataset_configurations_appended_files.py @@ -0,0 +1,147 @@ +""" +Unit tests for `get_default_dataset_configurations` operating on already written files open in append mode. + +Mostly testing that the right objects are skipped from identification as candidates for configuration. +""" +from pathlib import Path + +import numpy as np +import pytest +from hdmf.common import VectorData +from hdmf_zarr import NWBZarrIO +from pynwb import NWBHDF5IO, NWBFile +from pynwb.base import DynamicTable +from pynwb.testing.mock.base import mock_TimeSeries +from pynwb.testing.mock.file import mock_NWBFile + +from neuroconv.tools.nwb_helpers import ( + HDF5DatasetConfiguration, + ZarrDatasetConfiguration, + get_default_dataset_configurations, +) + + +def generate_nwbfile_with_existing_time_series() -> NWBFile: + nwbfile = mock_NWBFile() + array = np.array([[1, 2, 3], [4, 5, 6]]) + time_series = mock_TimeSeries(name="ExistingTimeSeries", data=array) + nwbfile.add_acquisition(time_series) + return nwbfile + + +@pytest.fixture(scope="session") +def hdf5_nwbfile_path(tmpdir_factory): + nwbfile_path = tmpdir_factory.mktemp("data").join("test_default_dataset_configurations_hdf5_nwbfile_.nwb.h5") + if not Path(nwbfile_path).exists(): + nwbfile = generate_nwbfile_with_existing_time_series() + with NWBHDF5IO(path=str(nwbfile_path), mode="w") as io: + io.write(nwbfile) + return str(nwbfile_path) + + +@pytest.fixture(scope="session") +def zarr_nwbfile_path(tmpdir_factory): + nwbfile_path = tmpdir_factory.mktemp("data").join("test_default_dataset_configurations_zarr_nwbfile.nwb.zarr") + if not Path(nwbfile_path).exists(): + nwbfile = generate_nwbfile_with_existing_time_series() + with NWBZarrIO(path=str(nwbfile_path), mode="w") as io: + io.write(nwbfile) + return str(nwbfile_path) + + +def test_unwrapped_time_series_hdf5(hdf5_nwbfile_path): + array = np.array([[1, 2, 3], [4, 5, 6]]) + + with NWBHDF5IO(path=hdf5_nwbfile_path, mode="a") as io: + nwbfile = io.read() + new_time_series = mock_TimeSeries(name="NewTimeSeries", data=array) + nwbfile.add_acquisition(new_time_series) + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="hdf5")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, HDF5DatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == new_time_series.object_id + assert dataset_configuration.dataset_info.location == "acquisition/NewTimeSeries/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + + +def test_unwrapped_time_series_zarr(zarr_nwbfile_path): + array = np.array([[1, 2, 3], [4, 5, 6]]) + + with NWBZarrIO(path=zarr_nwbfile_path, mode="a") as io: + nwbfile = io.read() + new_time_series = mock_TimeSeries(name="NewTimeSeries", data=array) + nwbfile.add_acquisition(new_time_series) + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="zarr")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, ZarrDatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == new_time_series.object_id + assert dataset_configuration.dataset_info.location == "acquisition/NewTimeSeries/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + assert dataset_configuration.filter_methods is None + assert dataset_configuration.filter_options is None + + +def test_unwrapped_dynamic_table_hdf5(hdf5_nwbfile_path): + array = np.array([0.1, 0.2, 0.3]) + + with NWBHDF5IO(path=hdf5_nwbfile_path, mode="a") as io: + nwbfile = io.read() + column = VectorData(name="TestColumn", description="", data=array.squeeze()) + dynamic_table = DynamicTable(name="TestDynamicTable", description="", columns=[column]) + nwbfile.add_acquisition(dynamic_table) + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="hdf5")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, HDF5DatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == column.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestDynamicTable/TestColumn/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + + +def test_unwrapped_dynamic_table_zarr(zarr_nwbfile_path): + array = np.array([0.1, 0.2, 0.3]) + + with NWBZarrIO(path=zarr_nwbfile_path, mode="a") as io: + nwbfile = io.read() + column = VectorData(name="TestColumn", description="", data=array.squeeze()) + dynamic_table = DynamicTable(name="TestDynamicTable", description="", columns=[column]) + nwbfile.add_acquisition(dynamic_table) + dataset_configurations = list(get_default_dataset_configurations(nwbfile=nwbfile, backend="zarr")) + + assert len(dataset_configurations) == 1 + + dataset_configuration = dataset_configurations[0] + assert isinstance(dataset_configuration, ZarrDatasetConfiguration) + assert dataset_configuration.dataset_info.object_id == column.object_id + assert dataset_configuration.dataset_info.location == "acquisition/TestDynamicTable/TestColumn/data" + assert dataset_configuration.dataset_info.maxshape == array.shape + assert dataset_configuration.dataset_info.dtype == array.dtype + assert dataset_configuration.chunk_shape == array.shape + assert dataset_configuration.buffer_shape == array.shape + assert dataset_configuration.compression_method == "gzip" + assert dataset_configuration.compression_options is None + assert dataset_configuration.filter_methods is None + assert dataset_configuration.filter_options is None