diff --git a/src/roiextractors/extractorlist.py b/src/roiextractors/extractorlist.py index 9ec7262d..091265c2 100644 --- a/src/roiextractors/extractorlist.py +++ b/src/roiextractors/extractorlist.py @@ -15,7 +15,8 @@ from .extractors.tiffimagingextractors import ( TiffImagingExtractor, ScanImageTiffImagingExtractor, - BrukerTiffImagingExtractor, + BrukerTiffMultiPlaneImagingExtractor, + BrukerTiffSinglePlaneImagingExtractor, MicroManagerTiffImagingExtractor, ) from .extractors.sbximagingextractor import SbxImagingExtractor @@ -30,7 +31,8 @@ Hdf5ImagingExtractor, TiffImagingExtractor, ScanImageTiffImagingExtractor, - BrukerTiffImagingExtractor, + BrukerTiffMultiPlaneImagingExtractor, + BrukerTiffSinglePlaneImagingExtractor, MicroManagerTiffImagingExtractor, MiniscopeImagingExtractor, NwbImagingExtractor, diff --git a/src/roiextractors/extractors/tiffimagingextractors/__init__.py b/src/roiextractors/extractors/tiffimagingextractors/__init__.py index 29a2c587..c7d03795 100644 --- a/src/roiextractors/extractors/tiffimagingextractors/__init__.py +++ b/src/roiextractors/extractors/tiffimagingextractors/__init__.py @@ -1,4 +1,4 @@ from .tiffimagingextractor import TiffImagingExtractor from .scanimagetiffimagingextractor import ScanImageTiffImagingExtractor -from .brukertiffimagingextractor import BrukerTiffImagingExtractor +from .brukertiffimagingextractor import BrukerTiffMultiPlaneImagingExtractor, BrukerTiffSinglePlaneImagingExtractor from .micromanagertiffimagingextractor import MicroManagerTiffImagingExtractor diff --git a/src/roiextractors/extractors/tiffimagingextractors/brukertiffimagingextractor.py b/src/roiextractors/extractors/tiffimagingextractors/brukertiffimagingextractor.py index 3854417e..0f30f0ff 100644 --- a/src/roiextractors/extractors/tiffimagingextractors/brukertiffimagingextractor.py +++ b/src/roiextractors/extractors/tiffimagingextractors/brukertiffimagingextractor.py @@ -1,13 +1,17 @@ import logging +import re +from collections import Counter +from itertools import islice from pathlib import Path from types import ModuleType -from typing import Optional, Tuple, Union, List, Iterable, Dict +from typing import Optional, Tuple, Union, List, Dict from xml.etree import ElementTree import numpy as np +from ...multiimagingextractor import MultiImagingExtractor from ...imagingextractor import ImagingExtractor -from ...extraction_tools import PathType, get_package, DtypeType +from ...extraction_tools import PathType, get_package, DtypeType, ArrayType def filter_read_uic_tag_warnings(record): @@ -21,12 +25,82 @@ def _get_tiff_reader() -> ModuleType: return get_package(package_name="tifffile", installation_instructions="pip install tifffile") -class BrukerTiffImagingExtractor(ImagingExtractor): - extractor_name = "BrukerTiffImaging" +def _determine_frame_rate(element: ElementTree.Element, file_names: Optional[List[str]] = None) -> Union[float, None]: + """ + Determines the frame rate from the difference in relative timestamps of the frame elements. + """ + from neuroconv.utils import calculate_regular_series_rate + + frame_elements = element.findall(".//Frame") + if file_names: + frame_elements = [ + frame for frame in frame_elements for file in frame.findall("File") if file.attrib["filename"] in file_names + ] + + relative_times = [float(frame.attrib["relativeTime"]) for frame in frame_elements] + frame_rate = calculate_regular_series_rate(np.array(relative_times)) + + return frame_rate + + +def _determine_imaging_is_volumetric(folder_path: PathType) -> bool: + """ + Determines whether imaging is volumetric based on 'zDevice' configuration value. + The value is expected to be '1' for volumetric and '0' for single plane images. + """ + xml_root = _parse_xml(folder_path=folder_path) + z_device_element = xml_root.find(".//PVStateValue[@key='zDevice']") + is_volumetric = bool(int(z_device_element.attrib["value"])) + + return is_volumetric + + +def _parse_xml(folder_path: PathType) -> ElementTree.Element: + """ + Parses the XML configuration file into element tree and returns the root Element. + """ + folder_path = Path(folder_path) + xml_file_path = folder_path / f"{folder_path.name}.xml" + assert xml_file_path.is_file(), f"The XML configuration file is not found at '{folder_path}'." + tree = ElementTree.parse(xml_file_path) + return tree.getroot() + + +class BrukerTiffMultiPlaneImagingExtractor(MultiImagingExtractor): + extractor_name = "BrukerTiffMultiPlaneImaging" is_writable = True mode = "folder" - def __init__(self, folder_path: PathType): + @classmethod + def get_streams(cls, folder_path: PathType) -> dict: + natsort = get_package(package_name="natsort", installation_instructions="pip install natsort") + + xml_root = _parse_xml(folder_path=folder_path) + channel_names = [file.attrib["channelName"] for file in xml_root.findall(".//File")] + unique_channel_names = natsort.natsorted(set(channel_names)) + streams = dict(channel_streams=unique_channel_names) + streams["plane_streams"] = dict() + if not _determine_imaging_is_volumetric(folder_path=folder_path): + return streams + # The "channelName" can be any name that the experimenter sets (e.g. 'Ch1', 'Ch2', 'Green', 'Red') + # Use the identifier of a channel "channel" (e.g. 1, 2) to match it to the file name + channel_ids = [file.attrib["channel"] for file in xml_root.findall(".//File")] + unique_channel_ids = natsort.natsorted(set(channel_ids)) + for channel_id, channel_name in zip(unique_channel_ids, unique_channel_names): + plane_naming_pattern = rf"(?PCh{channel_id}_\d+)" + plane_stream_names = [ + re.search(plane_naming_pattern, file.attrib["filename"])["stream_name"] + for file in xml_root.findall(f".//File") + ] + unique_plane_stream_names = natsort.natsorted(set(plane_stream_names)) + streams["plane_streams"][channel_name] = unique_plane_stream_names + return streams + + def __init__( + self, + folder_path: PathType, + stream_name: Optional[str] = None, + ): """ The imaging extractor for the Bruker TIF image format. This format consists of multiple TIF image files (.ome.tif) and configuration files (.xml, .env). @@ -35,64 +109,190 @@ def __init__(self, folder_path: PathType): ---------- folder_path : PathType The path to the folder that contains the Bruker TIF image files (.ome.tif) and configuration files (.xml, .env). + stream_name: str, optional + The name of the recording channel (e.g. "Ch2"). """ self._tifffile = _get_tiff_reader() - super().__init__() + folder_path = Path(folder_path) + tif_file_paths = list(folder_path.glob("*.ome.tif")) + assert tif_file_paths, f"The TIF image files are missing from '{folder_path}'." + + assert _determine_imaging_is_volumetric(folder_path=folder_path), ( + f"{self.extractor_name}Extractor is for volumetric imaging. " + "For single imaging plane data use BrukerTiffSinglePlaneImagingExtractor." + ) + + streams = self.get_streams(folder_path=folder_path) + if stream_name is None: + if len(streams["channel_streams"]) > 1: + raise ValueError( + "More than one recording stream is detected! Please specify which stream you wish to load with the `stream_name` argument. " + "To see what streams are available, call `BrukerTiffMultiPlaneImagingExtractor.get_stream_names(folder_path=...)`." + ) + channel_stream_name = streams["channel_streams"][0] + stream_name = streams["plane_streams"][channel_stream_name][0] + + channel_stream_name = stream_name.split("_")[0] + plane_stream_names = streams["plane_streams"][channel_stream_name] + if stream_name is not None and stream_name not in plane_stream_names: + raise ValueError( + f"The selected stream '{stream_name}' is not in the available plane_streams '{plane_stream_names}'!" + ) + self.folder_path = Path(folder_path) - tif_file_paths = list(self.folder_path.glob("*.ome.tif")) - assert tif_file_paths, f"The TIF image files are missing from '{self.folder_path}'." - - self._xml_file_path = self.folder_path / f"{self.folder_path.name}.xml" - assert self._xml_file_path.is_file(), f"The XML configuration file is not found at '{self.folder_path}'." - - sequences = self._get_xml_root().findall("Sequence") - num_sequences = len(sequences) - # TODO: extend it to general plane index, but we need example data for it first - self._sequence = sequences[0] - self._num_frames = len(self._sequence.findall("./Frame")) - - files = self._sequence.findall(".//File") - self._file_paths = [file.attrib["filename"] for file in files] - if num_sequences != 1: - missing_files = [ - file_path for file_path in self._file_paths if self.folder_path / file_path not in tif_file_paths - ] - assert ( - not missing_files - ), f"Some of the TIF image files at '{self.folder_path}' are missing for this plane. The list of files that are missing: {missing_files}" - else: - assert len(self._file_paths) == len( - tif_file_paths - ), f"The number of TIF image files at '{self.folder_path}' should be equal to the number of frames ({len(self._file_paths)}) specified in the XML configuration file." - - with self._tifffile.TiffFile(self.folder_path / self._file_paths[0], _multifile=False) as tif: - self._height, self._width = tif.pages[0].shape - self._dtype = tif.pages[0].dtype - self.xml_metadata = self._get_xml_metadata() - self._sampling_frequency = 1 / float(self.xml_metadata["framePeriod"]) + self.stream_name = stream_name + self._num_planes_per_channel_stream = len(plane_stream_names) + + imaging_extractors = [] + for stream_name in plane_stream_names: + extractor = BrukerTiffSinglePlaneImagingExtractor(folder_path=folder_path, stream_name=stream_name) + imaging_extractors.append(extractor) + + super().__init__(imaging_extractors=imaging_extractors) + + self._num_frames = self._imaging_extractors[0].get_num_frames() + self._image_size = *self._imaging_extractors[0].get_image_size(), self._num_planes_per_channel_stream + self.xml_metadata = self._imaging_extractors[0].xml_metadata - channel_names = [file.attrib["channelName"] for file in files] - unique_channel_names = list(set(channel_names)) - self._channel_names = unique_channel_names + self._start_frames = [0] * self._num_planes_per_channel_stream + self._end_frames = [self._num_frames] * self._num_planes_per_channel_stream - def _get_xml_root(self): + def get_image_size(self) -> Tuple[int, int, int]: + return self._image_size + + def get_num_frames(self) -> int: + return self._imaging_extractors[0].get_num_frames() + + def get_sampling_frequency(self) -> float: + return self._imaging_extractors[0].get_sampling_frequency() * self._num_planes_per_channel_stream + + def get_frames(self, frame_idxs: ArrayType, channel: Optional[int] = 0) -> np.ndarray: + if isinstance(frame_idxs, (int, np.integer)): + frame_idxs = [frame_idxs] + frame_idxs = np.array(frame_idxs) + assert np.all(frame_idxs < self.get_num_frames()), "'frame_idxs' exceed number of frames" + + frames_shape = (len(frame_idxs),) + self.get_image_size() + frames = np.empty(shape=frames_shape, dtype=self.get_dtype()) + + for plane_ind, extractor in enumerate(self._imaging_extractors): + frames[..., plane_ind] = extractor.get_frames(frame_idxs) + + return frames + + def get_video( + self, start_frame: Optional[int] = None, end_frame: Optional[int] = None, channel: int = 0 + ) -> np.ndarray: + if channel != 0: + raise NotImplementedError( + f"MultiImagingExtractors for multiple channels have not yet been implemented! (Received '{channel}'." + ) + + start = start_frame if start_frame is not None else 0 + stop = end_frame if end_frame is not None else self.get_num_frames() + + video_shape = (stop - start,) + self.get_image_size() + video = np.empty(shape=video_shape, dtype=self.get_dtype()) + + for plane_ind, extractor in enumerate(self._imaging_extractors): + video[..., plane_ind] = extractor.get_video(start_frame=start, end_frame=stop) + + return video + + +class BrukerTiffSinglePlaneImagingExtractor(MultiImagingExtractor): + extractor_name = "BrukerTiffSinglePlaneImaging" + is_writable = True + mode = "folder" + + @classmethod + def get_streams(cls, folder_path: PathType) -> dict: + natsort = get_package(package_name="natsort", installation_instructions="pip install natsort") + xml_root = _parse_xml(folder_path=folder_path) + channel_names = [file.attrib["channelName"] for file in xml_root.findall(".//File")] + unique_channel_names = natsort.natsorted(set(channel_names)) + streams = dict(channel_streams=unique_channel_names) + return streams + + def __init__(self, folder_path: PathType, stream_name: Optional[str] = None): """ - Parses the XML configuration file into element tree and returns the root of this tree. + The imaging extractor for the Bruker TIF image format. + This format consists of multiple TIF image files (.ome.tif) and configuration files (.xml, .env). + + Parameters + ---------- + folder_path : PathType + The path to the folder that contains the Bruker TIF image files (.ome.tif) and configuration files (.xml, .env). + stream_name: str, optional + The name of the recording channel (e.g. "Ch2"). """ - tree = ElementTree.parse(self._xml_file_path) - return tree.getroot() + self._tifffile = _get_tiff_reader() + + folder_path = Path(folder_path) + tif_file_paths = list(folder_path.glob("*.ome.tif")) + assert tif_file_paths, f"The TIF image files are missing from '{folder_path}'." + + streams = self.get_streams(folder_path=folder_path) + if stream_name is None: + if len(streams["channel_streams"]) > 1: + raise ValueError( + "More than one recording stream is detected! Please specify which stream you wish to load with the `stream_name` argument. " + "To see what streams are available, call `BrukerTiffSinglePlaneImagingExtractor.get_stream_names(folder_path=...)`." + ) + stream_name = streams["channel_streams"][0] + + self.stream_name = stream_name + channel_stream_name = self.stream_name.split("_")[0] + if self.stream_name is not None and channel_stream_name not in streams["channel_streams"]: + raise ValueError( + f"The selected stream '{self.stream_name}' is not in the available channel_streams '{streams['channel_streams']}'!" + ) + + self._xml_root = _parse_xml(folder_path=folder_path) + file_elements = self._xml_root.findall(".//File") + file_names = [file.attrib["filename"] for file in file_elements] + file_names_for_stream = [file for file in file_names if self.stream_name in file] + # determine image shape and data type from first file + with self._tifffile.TiffFile(folder_path / file_names_for_stream[0], _multifile=False) as tif: + self._height, self._width = tif.pages[0].shape + self._dtype = tif.pages[0].dtype + + sequence_elements = self._xml_root.findall("Sequence") + # determine the true sampling frequency + # the "framePeriod" in the XML is not trusted (usually higher than the true frame rate) + frame_rate = _determine_frame_rate(element=self._xml_root, file_names=file_names_for_stream) + if frame_rate is None and len(sequence_elements) > 1: + frame_rate = _determine_frame_rate(element=sequence_elements[0], file_names=file_names_for_stream) + assert frame_rate is not None, "Could not determine the frame rate from the XML file." + self._sampling_frequency = frame_rate + self._channel_names = [self.stream_name.split("_")[0]] + + # count the number of occurrences of each file path and their names + # files that contain stacks of images (multi-page tiffs) will appear repeated (number of repetition is the number of frames in the tif file) + file_counts = Counter(file_names_for_stream) + + imaging_extractors = [] + for file_name, num_frames in file_counts.items(): + extractor = _BrukerTiffSinglePlaneImagingExtractor(file_path=str(Path(folder_path) / file_name)) + extractor._num_frames = num_frames + extractor._image_size = (self._height, self._width) + extractor._dtype = self._dtype + imaging_extractors.append(extractor) + + self.xml_metadata = self._get_xml_metadata() + + super().__init__(imaging_extractors=imaging_extractors) def _get_xml_metadata(self) -> Dict[str, Union[str, List[Dict[str, str]]]]: """ Parses the metadata in the root element that are under "PVStateValue" tag into a dictionary. """ - root = self._get_xml_root() xml_metadata = dict() - xml_metadata.update(**root.attrib) - for child in root.findall(".//PVStateValue"): + xml_metadata.update(**self._xml_root.attrib) + for child in self._xml_root.findall(".//PVStateValue"): metadata_root_key = child.attrib["key"] if "value" in child.attrib: if metadata_root_key in xml_metadata: @@ -121,12 +321,13 @@ def _get_xml_metadata(self) -> Dict[str, Union[str, List[Dict[str, str]]]]: ) return xml_metadata + def _check_consistency_between_imaging_extractors(self): + """Overrides the parent class method as none of the properties that are checked are from the sub-imaging extractors.""" + return True + def get_image_size(self) -> Tuple[int, int]: return self._height, self._width - def get_num_frames(self) -> int: - return self._num_frames - def get_sampling_frequency(self) -> float: return self._sampling_frequency @@ -134,33 +335,74 @@ def get_channel_names(self) -> List[str]: return self._channel_names def get_num_channels(self) -> int: - channel_names = self.get_channel_names() - return len(channel_names) + return 1 def get_dtype(self) -> DtypeType: return self._dtype - def _frames_iterator( - self, - start_frame: Optional[int] = None, - end_frame: Optional[int] = None, - ) -> Iterable[np.memmap]: - if start_frame is not None and end_frame is not None: - if end_frame == start_frame: - yield self._tifffile.memmap( - self.folder_path / self._file_paths[start_frame], mode="r", _multifile=False - ) - for file in self._file_paths[start_frame:end_frame]: - yield self._tifffile.memmap(self.folder_path / file, mode="r", _multifile=False) +class _BrukerTiffSinglePlaneImagingExtractor(ImagingExtractor): + extractor_name = "_BrukerTiffSinglePlaneImaging" + is_writable = True + mode = "file" + + SAMPLING_FREQ_ERROR = "The {}Extractor does not support retrieving the imaging rate." + CHANNEL_NAMES_ERROR = "The {}Extractor does not support retrieving the name of the channels." + DATA_TYPE_ERROR = "The {}Extractor does not support retrieving the data type." + + def __init__(self, file_path: PathType): + """ + The private imaging extractor for OME-TIF image format produced by Bruker, + which defines the get_video() method to return the requested frames from a given file. + This extractor is not meant to be used as a standalone ImagingExtractor. + + Parameters + ---------- + file_path : PathType + The path to the TIF image file (.ome.tif) + """ + self.tifffile = _get_tiff_reader() + self.file_path = file_path + + super().__init__() + + self._num_frames = None + self._image_size = None + self._dtype = None + + def get_num_frames(self) -> int: + return self._num_frames + + def get_num_channels(self) -> int: + return 1 + + def get_image_size(self) -> Tuple[int, int]: + return self._image_size + + def get_sampling_frequency(self): + raise NotImplementedError(self.SAMPLING_FREQ_ERROR.format(self.extractor_name)) + + def get_channel_names(self) -> list: + raise NotImplementedError(self.CHANNEL_NAMES_ERROR.format(self.extractor_name)) + + def get_dtype(self): + raise NotImplementedError(self.DATA_TYPE_ERROR.format(self.extractor_name)) def get_video( self, start_frame: Optional[int] = None, end_frame: Optional[int] = None, channel: int = 0 ) -> np.ndarray: - if channel != 0: - raise NotImplementedError( - f"The {self.extractor_name}Extractor does not currently support multiple color channels." - ) - frames = list(self._frames_iterator(start_frame=start_frame, end_frame=end_frame)) - video = np.stack(frames, axis=0) + with self.tifffile.TiffFile(self.file_path, _multifile=False) as tif: + pages = tif.pages + + if start_frame is not None and end_frame is not None and start_frame == end_frame: + return pages[start_frame].asarray() + + end_frame = end_frame or self.get_num_frames() + start_frame = start_frame or 0 + + image_shape = (end_frame - start_frame, *self.get_image_size()) + video = np.zeros(shape=image_shape, dtype=self._dtype) + for page_ind, page in enumerate(islice(pages, start_frame, end_frame)): + video[page_ind] = page.asarray() + return video diff --git a/tests/test_brukertiffimagingextactor.py b/tests/test_brukertiffimagingextactor.py index b6f976b3..a1e5361b 100644 --- a/tests/test_brukertiffimagingextactor.py +++ b/tests/test_brukertiffimagingextactor.py @@ -4,52 +4,77 @@ import numpy as np from hdmf.testing import TestCase +from natsort import natsorted from numpy.testing import assert_array_equal from tifffile import tifffile -from roiextractors import BrukerTiffImagingExtractor +from roiextractors import BrukerTiffMultiPlaneImagingExtractor, BrukerTiffSinglePlaneImagingExtractor from .setup_paths import OPHYS_DATA_PATH -class TestBrukerTiffExtractor(TestCase): +def _get_test_video(file_paths): + frames = [ + tifffile.memmap( + file, + mode="r", + _multifile=False, + ) + for file in file_paths + ] + return np.stack(frames, axis=0) + + +class TestBrukerTiffExtractorSinglePlaneCase(TestCase): @classmethod def setUpClass(cls): folder_path = str( OPHYS_DATA_PATH / "imaging_datasets" / "BrukerTif" / "NCCR32_2023_02_20_Into_the_void_t_series_baseline-000" ) + cls.available_streams = dict(channel_streams=["Ch2"]) cls.folder_path = folder_path - extractor = BrukerTiffImagingExtractor(folder_path=folder_path) + extractor = BrukerTiffSinglePlaneImagingExtractor(folder_path=folder_path) cls.extractor = extractor - file_paths = extractor._file_paths - frames = [] - for file in file_paths: - with tifffile.TiffFile(Path(folder_path) / file, _multifile=False) as tif: - frames.append(tif.asarray()) - cls.video = np.stack(frames, axis=0) + file_paths = natsorted(Path(folder_path).glob("*.ome.tif")) + cls.video = _get_test_video(file_paths=file_paths) # temporary directory for testing assertion when xml file is missing test_dir = tempfile.mkdtemp() cls.test_dir = test_dir - shutil.copy(Path(folder_path) / file_paths[0], Path(test_dir) / file_paths[0]) + shutil.copy(file_paths[0], Path(test_dir) / file_paths[0].name) @classmethod def tearDownClass(cls): # remove the temporary directory and its contents shutil.rmtree(cls.test_dir) + def test_stream_names(self): + self.assertEqual( + BrukerTiffSinglePlaneImagingExtractor.get_streams(folder_path=self.folder_path), self.available_streams + ) + + def test_incorrect_stream_name_raises(self): + exc_msg = f"The selected stream 'Ch1' is not in the available channel_streams '['Ch2']'!" + with self.assertRaisesWith(ValueError, exc_msg=exc_msg): + BrukerTiffSinglePlaneImagingExtractor(folder_path=self.folder_path, stream_name="Ch1") + def test_tif_files_are_missing_assertion(self): folder_path = "not a tiff path" exc_msg = f"The TIF image files are missing from '{folder_path}'." with self.assertRaisesWith(AssertionError, exc_msg=exc_msg): - BrukerTiffImagingExtractor(folder_path=folder_path) + BrukerTiffSinglePlaneImagingExtractor(folder_path=folder_path) + + def test_volumetric_extractor_cannot_be_used_for_non_volumetric_data(self): + exc_msg = "BrukerTiffMultiPlaneImagingExtractor is for volumetric imaging. For single imaging plane data use BrukerTiffSinglePlaneImagingExtractor." + with self.assertRaisesWith(AssertionError, exc_msg=exc_msg): + BrukerTiffMultiPlaneImagingExtractor(folder_path=self.folder_path) def test_xml_configuration_file_is_missing_assertion(self): folder_path = self.test_dir exc_msg = f"The XML configuration file is not found at '{folder_path}'." with self.assertRaisesWith(AssertionError, exc_msg=exc_msg): - BrukerTiffImagingExtractor(folder_path=folder_path) + BrukerTiffSinglePlaneImagingExtractor(folder_path=folder_path) def test_brukertiffextractor_image_size(self): self.assertEqual(self.extractor.get_image_size(), (512, 512)) @@ -58,7 +83,7 @@ def test_brukertiffextractor_num_frames(self): self.assertEqual(self.extractor.get_num_frames(), 10) def test_brukertiffextractor_sampling_frequency(self): - self.assertEqual(self.extractor.get_sampling_frequency(), 30.345939461428763) + self.assertEqual(self.extractor.get_sampling_frequency(), 29.873732099062256) def test_brukertiffextractor_channel_names(self): self.assertEqual(self.extractor.get_channel_names(), ["Ch2"]) @@ -70,22 +95,137 @@ def test_brukertiffextractor_dtype(self): self.assertEqual(self.extractor.get_dtype(), np.uint16) def test_brukertiffextractor_get_video(self): - assert_array_equal(self.extractor.get_video(), self.video) + video = self.extractor.get_video() + assert_array_equal(video, self.video) + self.assertEqual(video.dtype, np.uint16) + assert_array_equal(self.extractor.get_video(start_frame=0, end_frame=1), self.video[:1]) def test_brukertiffextractor_get_single_frame(self): assert_array_equal(self.extractor.get_frames(frame_idxs=[0]), self.video[0][np.newaxis, ...]) - def test_brukertiffextractor_get_video_multi_channel_assertion(self): - exc_msg = "The BrukerTiffImagingExtractor does not currently support multiple color channels." - with self.assertRaisesWith(NotImplementedError, exc_msg=exc_msg): - self.extractor.get_video(channel=1) - def test_brukertiffextractor_xml_metadata(self): - xml_metadata = self.extractor.xml_metadata +class TestBrukerTiffExtractorDualPlaneCase(TestCase): + @classmethod + def setUpClass(cls): + folder_path = str( + OPHYS_DATA_PATH / "imaging_datasets" / "BrukerTif" / "NCCR32_2022_11_03_IntoTheVoid_t_series-005" + ) + cls.folder_path = folder_path + cls.extractor = BrukerTiffMultiPlaneImagingExtractor(folder_path=folder_path) + + first_plane_file_paths = [ + f"{cls.folder_path}/NCCR32_2022_11_03_IntoTheVoid_t_series-005_Cycle0000{num + 1}_Ch2_000001.ome.tif" + for num in range(5) + ] + second_plane_file_paths = [ + f"{cls.folder_path}/NCCR32_2022_11_03_IntoTheVoid_t_series-005_Cycle0000{num + 1}_Ch2_000002.ome.tif" + for num in range(5) + ] + + cls.available_streams = dict( + channel_streams=["Ch2"], + plane_streams=dict(Ch2=["Ch2_000001", "Ch2_000002"]), + ) + cls.test_video = np.zeros((5, 512, 512, 2), dtype=np.uint16) + first_plane_video = _get_test_video(file_paths=first_plane_file_paths) + cls.test_video[..., 0] = first_plane_video + second_plane_video = _get_test_video(file_paths=second_plane_file_paths) + cls.test_video[..., 1] = second_plane_video - self.assertEqual(xml_metadata["version"], "5.6.64.400") - self.assertEqual(xml_metadata["date"], "2/20/2023 3:58:25 PM") - self.assertEqual(xml_metadata["framePeriod"], "0.032953338") + def test_stream_names(self): + self.assertEqual( + BrukerTiffMultiPlaneImagingExtractor.get_streams(folder_path=self.folder_path), self.available_streams + ) + + def test_brukertiffextractor_image_size(self): + self.assertEqual(self.extractor.get_image_size(), (512, 512, 2)) + + def test_brukertiffextractor_num_frames(self): + self.assertEqual(self.extractor.get_num_frames(), 5) + + def test_brukertiffextractor_sampling_frequency(self): + self.assertEqual(self.extractor.get_sampling_frequency(), 20.629515014336377) + + def test_brukertiffextractor_channel_names(self): + self.assertEqual(self.extractor.get_channel_names(), ["Ch2"]) + + def test_brukertiffextractor_num_channels(self): + self.assertEqual(self.extractor.get_num_channels(), 1) + + def test_brukertiffextractor_dtype(self): + self.assertEqual(self.extractor.get_dtype(), np.uint16) + + def test_incorrect_stream_with_disjoint_plane_raises(self): + exc_msg = ( + "The selected stream 'Ch2_000003' is not in the available plane_streams '['Ch2_000001', 'Ch2_000002']'!" + ) + with self.assertRaisesWith(ValueError, exc_msg=exc_msg): + BrukerTiffMultiPlaneImagingExtractor( + folder_path=self.folder_path, + stream_name="Ch2_000003", + ) + + def test_brukertiffextractor_get_video(self): + video = self.extractor.get_video() + assert_array_equal(video, self.test_video) + self.assertEqual(video.dtype, np.uint16) + assert_array_equal(self.extractor.get_video(start_frame=2, end_frame=4), self.test_video[2:4]) + + def test_brukertiffextractor_get_single_frame(self): + assert_array_equal(self.extractor.get_frames(frame_idxs=[0]), self.test_video[0][np.newaxis, ...]) + + +class TestBrukerTiffExtractorDualColorCase(TestCase): + @classmethod + def setUpClass(cls): + folder_path = str( + OPHYS_DATA_PATH / "imaging_datasets" / "BrukerTif" / "NCCR62_2023_07_06_IntoTheVoid_t_series_Dual_color-000" + ) + cls.folder_path = folder_path + cls.available_streams = dict(channel_streams=["Ch1", "Ch2"]) + cls.extractor = BrukerTiffSinglePlaneImagingExtractor(folder_path=cls.folder_path, stream_name="Ch1") + + file_paths = natsorted(Path(folder_path).glob("*.ome.tif")) + cls.test_video_ch1 = tifffile.TiffFile(file_paths[0]).asarray() + cls.test_video_ch2 = tifffile.TiffFile(file_paths[1]).asarray() + + def test_not_selecting_stream_raises(self): + exc_msg = "More than one recording stream is detected! Please specify which stream you wish to load with the `stream_name` argument. To see what streams are available, call `BrukerTiffSinglePlaneImagingExtractor.get_stream_names(folder_path=...)`." + with self.assertRaisesWith(ValueError, exc_msg=exc_msg): + BrukerTiffSinglePlaneImagingExtractor(folder_path=self.folder_path) + + def test_stream_names(self): + assert_array_equal( + BrukerTiffSinglePlaneImagingExtractor.get_streams(folder_path=self.folder_path), self.available_streams + ) + + def test_brukertiffextractor_image_size(self): + self.assertEqual(self.extractor.get_image_size(), (512, 512)) + + def test_brukertiffextractor_channel_names(self): + self.assertEqual(self.extractor.get_channel_names(), ["Ch1"]) + + def test_brukertiffextractor_dtype(self): + self.assertEqual(self.extractor.get_dtype(), np.uint16) + + def test_brukertiffextractor_sampling_frequency(self): + self.assertEqual(self.extractor.get_sampling_frequency(), 29.873615189896864) + + def test_brukertiffextractor_get_video(self): + assert_array_equal(self.extractor.get_video(start_frame=0, end_frame=1), self.test_video_ch1[:1]) + video = self.extractor.get_video() + assert_array_equal(video, self.test_video_ch1) + self.assertEqual(video.dtype, np.uint16) + + def test_brukertiffextractor_second_stream_get_video(self): + extractor = BrukerTiffSinglePlaneImagingExtractor(folder_path=self.folder_path, stream_name="Ch2") + video = extractor.get_video() + assert_array_equal(extractor.get_video(), self.test_video_ch2) + self.assertEqual(video.dtype, np.uint16) + + def test_brukertiffextractor_second_stream_sampling_frequency(self): + extractor = BrukerTiffSinglePlaneImagingExtractor(folder_path=self.folder_path, stream_name="Ch2") self.assertEqual( - xml_metadata["micronsPerPixel"], [{"XAxis": "1.1078125"}, {"YAxis": "1.1078125"}, {"ZAxis": "5"}] + self.extractor.get_sampling_frequency(), + extractor.get_sampling_frequency(), )