diff --git a/CHANGELOG.md b/CHANGELOG.md index ed349ec2..c894813b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Upcoming +### Improvements +* Improved xml parsing with Bruker [PR #267](https://github.com/catalystneuro/roiextractors/pull/267) + # v0.5.5 diff --git a/src/roiextractors/extractors/tiffimagingextractors/brukertiffimagingextractor.py b/src/roiextractors/extractors/tiffimagingextractors/brukertiffimagingextractor.py index e307d826..91197214 100644 --- a/src/roiextractors/extractors/tiffimagingextractors/brukertiffimagingextractor.py +++ b/src/roiextractors/extractors/tiffimagingextractors/brukertiffimagingextractor.py @@ -65,9 +65,15 @@ def _determine_imaging_is_volumetric(folder_path: PathType) -> bool: is_volumetric: bool True if the imaging is volumetric (multiplane), False otherwise (single plane). """ - xml_root = _parse_xml(folder_path=folder_path) - z_device_element = xml_root.find(".//PVStateValue[@key='zDevice']") - is_volumetric = bool(int(z_device_element.attrib["value"])) + folder_path = Path(folder_path) + xml_file_path = folder_path / f"{folder_path.name}.xml" + assert xml_file_path.is_file(), f"The XML configuration file is not found at '{xml_file_path}'." + + is_volumetric = False + for event, elem in ElementTree.iterparse(xml_file_path, events=("start",)): + if elem.tag == "PVStateValue" and elem.attrib.get("key") == "zDevice": + is_volumetric = bool(int(elem.attrib["value"])) + break # Stop parsing as we've found the required element return is_volumetric @@ -107,25 +113,52 @@ def get_streams(cls, folder_path: PathType) -> dict: """ natsort = get_package(package_name="natsort", installation_instructions="pip install natsort") - xml_root = _parse_xml(folder_path=folder_path) - channel_names = [file.attrib["channelName"] for file in xml_root.findall(".//File")] - unique_channel_names = natsort.natsorted(set(channel_names)) + folder_path = Path(folder_path) + xml_file_path = folder_path / f"{folder_path.name}.xml" + assert xml_file_path.is_file(), f"The XML configuration file is not found at '{folder_path}'." + + channel_names = set() + channel_ids = set() + file_names = [] + + # Parse the XML file iteratively to find the first Sequence element + first_sequence_element = None + for _, elem in ElementTree.iterparse(xml_file_path, events=("end",)): + if elem.tag == "Sequence": + first_sequence_element = elem + break + + if first_sequence_element is None: + raise ValueError("No Sequence element found in the XML configuration file. Can't get streams") + + # Then in the first Sequence we find all the Frame elements + if first_sequence_element is not None: + # Iterate over all Frame elements within the first Sequence + frame_elements = first_sequence_element.findall(".//Frame") + for frame_elemenet in frame_elements: + # Iterate over all File elements within each Frame + for file_elem in frame_elemenet.findall("File"): + channel_names.add(file_elem.attrib["channelName"]) + channel_ids.add(file_elem.attrib["channel"]) + file_names.append(file_elem.attrib["filename"]) + + unique_channel_names = natsort.natsorted(channel_names) + unique_channel_ids = natsort.natsorted(channel_ids) + streams = dict(channel_streams=unique_channel_names) streams["plane_streams"] = dict() + if not _determine_imaging_is_volumetric(folder_path=folder_path): return streams - # The "channelName" can be any name that the experimenter sets (e.g. 'Ch1', 'Ch2', 'Green', 'Red') - # Use the identifier of a channel "channel" (e.g. 1, 2) to match it to the file name - channel_ids = [file.attrib["channel"] for file in xml_root.findall(".//File")] - unique_channel_ids = natsort.natsorted(set(channel_ids)) + for channel_id, channel_name in zip(unique_channel_ids, unique_channel_names): plane_naming_pattern = rf"(?PCh{channel_id}_\d+)" - plane_stream_names = [ - re.search(plane_naming_pattern, file.attrib["filename"])["stream_name"] - for file in xml_root.findall(f".//File") - ] + regular_expression_matches = [re.search(plane_naming_pattern, filename) for filename in file_names] + plane_stream_names = [matches["stream_name"] for matches in regular_expression_matches if matches] + unique_plane_stream_names = natsort.natsorted(set(plane_stream_names)) streams["plane_streams"][channel_name] = unique_plane_stream_names + return streams def __init__( diff --git a/tests/test_brukertiffimagingextactor.py b/tests/test_brukertiffimagingextactor.py index a1e5361b..a975a3c1 100644 --- a/tests/test_brukertiffimagingextactor.py +++ b/tests/test_brukertiffimagingextactor.py @@ -133,9 +133,9 @@ def setUpClass(cls): cls.test_video[..., 1] = second_plane_video def test_stream_names(self): - self.assertEqual( - BrukerTiffMultiPlaneImagingExtractor.get_streams(folder_path=self.folder_path), self.available_streams - ) + found_streams = BrukerTiffMultiPlaneImagingExtractor.get_streams(folder_path=self.folder_path) + expected_streams = self.available_streams + self.assertEqual(found_streams, expected_streams) def test_brukertiffextractor_image_size(self): self.assertEqual(self.extractor.get_image_size(), (512, 512, 2))