Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backend Configuration IIIb] Configure DynamicTable router for set_data_io #700

Merged
merged 17 commits into from
Jan 4, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 32 additions & 32 deletions src/neuroconv/tools/nwb_helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,56 @@
"""Collection of Pydantic models and helper functions for configuring dataset IO parameters for different backends."""
# Mark these imports as private to avoid polluting the namespace; only used in global BACKEND_NWB_IO mapping
from pynwb import NWBHDF5IO as _NWBHDF5IO
from hdmf_zarr import NWBZarrIO as _NWBZarrIO
from pynwb import NWBHDF5IO as _NWBHDF5IO

from ._backend_configuration import get_default_backend_configuration
from ._configuration_models._base_backend import BackendConfiguration
from ._configuration_models._base_dataset_io import DatasetInfo, DatasetIOConfiguration
from ._configuration_models._hdf5_backend import HDF5BackendConfiguration
from ._configuration_models._hdf5_dataset_io import (
AVAILABLE_HDF5_COMPRESSION_METHODS,
HDF5DatasetIOConfiguration,
AVAILABLE_HDF5_COMPRESSION_METHODS,
HDF5DatasetIOConfiguration,
)
from ._configuration_models._zarr_backend import ZarrBackendConfiguration
from ._configuration_models._zarr_dataset_io import (
AVAILABLE_ZARR_COMPRESSION_METHODS,
ZarrDatasetIOConfiguration,
AVAILABLE_ZARR_COMPRESSION_METHODS,
ZarrDatasetIOConfiguration,
)
from ._configure_backend import configure_backend
from ._dataset_configuration import get_default_dataset_io_configurations
from ._metadata_and_file_helpers import (
add_device_from_metadata,
get_default_nwbfile_metadata,
get_module,
make_nwbfile_from_metadata,
make_or_load_nwbfile,
add_device_from_metadata,
get_default_nwbfile_metadata,
get_module,
make_nwbfile_from_metadata,
make_or_load_nwbfile,
)

BACKEND_CONFIGURATIONS = dict(hdf5=HDF5BackendConfiguration, zarr=ZarrBackendConfiguration)
DATASET_IO_CONFIGURATIONS = dict(hdf5=HDF5DatasetIOConfiguration, zarr=ZarrDatasetIOConfiguration)
BACKEND_NWB_IO = dict(hdf5=_NWBHDF5IO, zarr=_NWBZarrIO)

__all__ = [
"AVAILABLE_HDF5_COMPRESSION_METHODS",
"AVAILABLE_ZARR_COMPRESSION_METHODS",
"BACKEND_CONFIGURATIONS",
"DATASET_IO_CONFIGURATIONS",
"BACKEND_NWB_IO",
"get_default_backend_configuration",
"get_default_dataset_io_configurations",
"configure_backend",
"BackendConfiguration",
"DatasetIOConfiguration",
"get_default_dataset_io_configurations",
"get_default_backend_configuration",
"add_device_from_metadata",
"get_default_nwbfile_metadata",
"get_module",
"make_nwbfile_from_metadata",
"make_or_load_nwbfile",
"DatasetInfo",
"HDF5BackendConfiguration",
"HDF5DatasetIOConfiguration",
"ZarrBackendConfiguration",
"ZarrDatasetIOConfiguration",
"AVAILABLE_HDF5_COMPRESSION_METHODS",
"AVAILABLE_ZARR_COMPRESSION_METHODS",
"BACKEND_CONFIGURATIONS",
"DATASET_IO_CONFIGURATIONS",
"BACKEND_NWB_IO",
"get_default_backend_configuration",
"get_default_dataset_io_configurations",
"configure_backend",
"BackendConfiguration",
"DatasetIOConfiguration",
"get_default_dataset_io_configurations",
"get_default_backend_configuration",
"add_device_from_metadata",
"get_default_nwbfile_metadata",
"get_module",
"make_nwbfile_from_metadata",
"make_or_load_nwbfile",
"DatasetInfo",
"HDF5BackendConfiguration",
"HDF5DatasetIOConfiguration",
"ZarrBackendConfiguration",
"ZarrDatasetIOConfiguration",
]
42 changes: 21 additions & 21 deletions src/neuroconv/tools/nwb_helpers/_configure_backend.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
"""Collection of helper functions related to configuration of datasets dependent on backend."""
from typing import Union

from pynwb import NWBFile, TimeSeries
from hdmf.common import Data
from pynwb import NWBFile, TimeSeries

from ._configuration_models._hdf5_backend import HDF5BackendConfiguration
from ._configuration_models._zarr_backend import ZarrBackendConfiguration


def configure_backend(
nwbfile: NWBFile, backend_configuration: Union[HDF5BackendConfiguration, ZarrBackendConfiguration]
nwbfile: NWBFile, backend_configuration: Union[HDF5BackendConfiguration, ZarrBackendConfiguration]
) -> None:
"""Configure all datasets specified in the `backend_configuration` with their appropriate DataIO and options."""
nwbfile_objects = nwbfile.objects
"""Configure all datasets specified in the `backend_configuration` with their appropriate DataIO and options."""
nwbfile_objects = nwbfile.objects

data_io_class = backend_configuration.data_io_class
for dataset_configuration in backend_configuration.dataset_configurations.values():
object_id = dataset_configuration.dataset_info.object_id
dataset_name = dataset_configuration.dataset_info.dataset_name
data_io_kwargs = dataset_configuration.get_data_io_kwargs()
data_io_class = backend_configuration.data_io_class
for dataset_configuration in backend_configuration.dataset_configurations.values():
object_id = dataset_configuration.dataset_info.object_id
dataset_name = dataset_configuration.dataset_info.dataset_name
data_io_kwargs = dataset_configuration.get_data_io_kwargs()

# TODO: update buffer shape in iterator, if present
# TODO: update buffer shape in iterator, if present

if isinstance(nwbfile_objects[object_id], Data):
nwbfile_objects[object_id].set_data_io(data_io_class=data_io_class, data_io_kwargs=data_io_kwargs)
elif isinstance(nwbfile_objects[object_id], TimeSeries):
nwbfile_objects[object_id].set_data_io(
dataset_name=dataset_name, data_io_class=data_io_class, **data_io_kwargs
)
else: # Strictly speaking, it would be odd if a backend_configuration led to this, but might as well be safe
raise NotImplementedError(
f"Unsupported object type {type(nwbfile_objects[object_id])} for backend "
f"configuration of {nwbfile_objects[object_id].name}!"
)
if isinstance(nwbfile_objects[object_id], Data):
h-mayorquin marked this conversation as resolved.
Show resolved Hide resolved
nwbfile_objects[object_id].set_data_io(data_io_class=data_io_class, data_io_kwargs=data_io_kwargs)
elif isinstance(nwbfile_objects[object_id], TimeSeries):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens with the timestamps if they are present? Are they chunked compressed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ought to be

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are they Data or a TimeSeries? I guess that was my question.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are a dataset of the TimeSeries

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(it would enter line 29 here with dataset_name="timestamps")

nwbfile_objects[object_id].set_data_io(
dataset_name=dataset_name, data_io_class=data_io_class, **data_io_kwargs
)
else: # Strictly speaking, it would be odd if a backend_configuration led to this, but might as well be safe
raise NotImplementedError(
f"Unsupported object type {type(nwbfile_objects[object_id])} for backend "
f"configuration of {nwbfile_objects[object_id].name}!"
)
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions src/neuroconv/tools/spikeinterface/spikeinterface.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import uuid
import numpy as np
import warnings
from collections import defaultdict
from numbers import Real
from pathlib import Path
from typing import List, Literal, Optional, Union

import numpy as np
import psutil
import pynwb
from pathlib import Path
from hdmf.backends.hdf5.h5_utils import H5DataIO
from hdmf.data_utils import AbstractDataChunkIterator, DataChunkIterator
from packaging.version import Version
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add a ragged array tests. Does chunking makes sense on those cases? I guess it is very difficult to define a good chunking pattern unlesss your entries are homogeneous.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would; I think we do have general configuration tests for this, just not for the configure_backend call since in principle there's nothing at all different about those special columns that have _index in the name (they are still just VectorData at the end of the day)

Original file line number Diff line number Diff line change
@@ -5,95 +5,95 @@
import numcodecs
import numpy as np
import pytest
from hdmf.common import DynamicTable, VectorData
from hdmf.data_utils import DataChunkIterator
from pynwb.testing.mock.base import mock_TimeSeries
from pynwb.testing.mock.file import mock_NWBFile
from hdmf.common import DynamicTable, VectorData

from neuroconv.tools.hdmf import SliceableDataChunkIterator
from neuroconv.tools.nwb_helpers import (
BACKEND_NWB_IO,
configure_backend,
get_default_backend_configuration,
BACKEND_NWB_IO,
configure_backend,
get_default_backend_configuration,
)


@pytest.mark.parametrize(
"case_name,iterator,iterator_options",
[
("unwrapped", lambda x: x, dict()),
("generic", SliceableDataChunkIterator, dict()),
("classic", DataChunkIterator, dict(iter_axis=1, buffer_size=30_000 * 5)),
# Need to hardcode buffer size in classic case or else it takes forever...
],
"case_name,iterator,iterator_options",
[
("unwrapped", lambda x: x, dict()),
("generic", SliceableDataChunkIterator, dict()),
("classic", DataChunkIterator, dict(iter_axis=1, buffer_size=30_000 * 5)),
# Need to hardcode buffer size in classic case or else it takes forever...
],
)
@pytest.mark.parametrize("backend", ["hdf5", "zarr"])
def test_simple_time_series(
tmpdir: Path, case_name: str, iterator: callable, iterator_options: dict, backend: Literal["hdf5", "zarr"]
tmpdir: Path, case_name: str, iterator: callable, iterator_options: dict, backend: Literal["hdf5", "zarr"]
):
array = np.zeros(shape=(30_000 * 5, 384), dtype="int16")
data = iterator(array, **iterator_options)
array = np.zeros(shape=(30_000 * 5, 384), dtype="int16")
data = iterator(array, **iterator_options)

nwbfile = mock_NWBFile()
time_series = mock_TimeSeries(name="TestTimeSeries", data=data)
nwbfile.add_acquisition(time_series)
nwbfile = mock_NWBFile()
time_series = mock_TimeSeries(name="TestTimeSeries", data=data)
nwbfile.add_acquisition(time_series)

backend_configuration = get_default_backend_configuration(nwbfile=nwbfile, backend=backend)
dataset_configuration = backend_configuration.dataset_configurations["acquisition/TestTimeSeries/data"]
configure_backend(nwbfile=nwbfile, backend_configuration=backend_configuration)
backend_configuration = get_default_backend_configuration(nwbfile=nwbfile, backend=backend)
dataset_configuration = backend_configuration.dataset_configurations["acquisition/TestTimeSeries/data"]
configure_backend(nwbfile=nwbfile, backend_configuration=backend_configuration)

nwbfile_path = str(tmpdir / f"test_configure_defaults_{case_name}_time_series.nwb.{backend}")
with BACKEND_NWB_IO[backend](path=nwbfile_path, mode="w") as io:
io.write(nwbfile)
nwbfile_path = str(tmpdir / f"test_configure_defaults_{case_name}_time_series.nwb.{backend}")
with BACKEND_NWB_IO[backend](path=nwbfile_path, mode="w") as io:
io.write(nwbfile)

with BACKEND_NWB_IO[backend](path=nwbfile_path, mode="r") as io:
written_nwbfile = io.read()
written_data = written_nwbfile.acquisition["TestTimeSeries"].data
with BACKEND_NWB_IO[backend](path=nwbfile_path, mode="r") as io:
written_nwbfile = io.read()
written_data = written_nwbfile.acquisition["TestTimeSeries"].data

assert written_data.chunks == dataset_configuration.chunk_shape
assert written_data.chunks == dataset_configuration.chunk_shape

if backend == "hdf5":
assert written_data.compression == "gzip"
elif backend == "zarr":
assert written_data.compressor == numcodecs.GZip(level=1)
if backend == "hdf5":
assert written_data.compression == "gzip"
elif backend == "zarr":
assert written_data.compressor == numcodecs.GZip(level=1)


@pytest.mark.parametrize(
"case_name,iterator,iterator_options",
[
("unwrapped", lambda x: x, dict()),
("generic", SliceableDataChunkIterator, dict()),
("classic", DataChunkIterator, dict(iter_axis=1, buffer_size=30_000 * 5)),
# Need to hardcode buffer size in classic case or else it takes forever...
],
"case_name,iterator,iterator_options",
[
("unwrapped", lambda x: x, dict()),
("generic", SliceableDataChunkIterator, dict()),
("classic", DataChunkIterator, dict(iter_axis=1, buffer_size=30_000 * 5)),
# Need to hardcode buffer size in classic case or else it takes forever...
],
)
@pytest.mark.parametrize("backend", ["hdf5", "zarr"])
def test_simple_dynamic_table(
tmpdir: Path, case_name: str, iterator: callable, iterator_options: dict, backend: Literal["hdf5", "zarr"]
tmpdir: Path, case_name: str, iterator: callable, iterator_options: dict, backend: Literal["hdf5", "zarr"]
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
):
data = np.zeros(shape=(30_000 * 5, 384), dtype="int16")
data = np.zeros(shape=(30_000 * 5, 384), dtype="int16")

nwbfile = mock_NWBFile()
dynamic_table = DynamicTable(
name="TestDynamicTable", description="", columns=[VectorData(name="TestColumn", description="", data=data)]
)
nwbfile.add_acquisition(dynamic_table)
nwbfile = mock_NWBFile()
dynamic_table = DynamicTable(
name="TestDynamicTable", description="", columns=[VectorData(name="TestColumn", description="", data=data)]
)
nwbfile.add_acquisition(dynamic_table)

backend_configuration = get_default_backend_configuration(nwbfile=nwbfile, backend=backend)
dataset_configuration = backend_configuration.dataset_configurations["acquisition/TestDynamicTable/TestColumn/data"]
configure_backend(nwbfile=nwbfile, backend_configuration=backend_configuration)
backend_configuration = get_default_backend_configuration(nwbfile=nwbfile, backend=backend)
dataset_configuration = backend_configuration.dataset_configurations["acquisition/TestDynamicTable/TestColumn/data"]
configure_backend(nwbfile=nwbfile, backend_configuration=backend_configuration)

nwbfile_path = str(tmpdir / f"test_configure_defaults_{case_name}_dynamic_table.nwb.{backend}")
with BACKEND_NWB_IO[backend](path=nwbfile_path, mode="w") as io:
io.write(nwbfile)
nwbfile_path = str(tmpdir / f"test_configure_defaults_{case_name}_dynamic_table.nwb.{backend}")
CodyCBakerPhD marked this conversation as resolved.
Show resolved Hide resolved
with BACKEND_NWB_IO[backend](path=nwbfile_path, mode="w") as io:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also avoid inlining the IO class here. I think it makes the code harder to read but is marginal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

io.write(nwbfile)

with BACKEND_NWB_IO[backend](path=nwbfile_path, mode="r") as io:
written_nwbfile = io.read()
written_data = written_nwbfile.acquisition["TestDynamicTable"]["TestColumn"].data
with BACKEND_NWB_IO[backend](path=nwbfile_path, mode="r") as io:
written_nwbfile = io.read()
written_data = written_nwbfile.acquisition["TestDynamicTable"]["TestColumn"].data

assert written_data.chunks == dataset_configuration.chunk_shape
assert written_data.chunks == dataset_configuration.chunk_shape

if backend == "hdf5":
assert written_data.compression == "gzip"
elif backend == "zarr":
assert written_data.compressor == numcodecs.GZip(level=1)
if backend == "hdf5":
assert written_data.compression == "gzip"
elif backend == "zarr":
assert written_data.compressor == numcodecs.GZip(level=1)
Loading