Skip to content

Commit

Permalink
Merge pull request #267 from catalystneuro/improve_bruker_tiff
Browse files Browse the repository at this point in the history
Improve Bruker xml parsing by using iterparse instead of parsing the complete xml file
  • Loading branch information
h-mayorquin authored Dec 19, 2023
2 parents 2c08f65 + 518baca commit 70b629d
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Upcoming

### Improvements
* Improved xml parsing with Bruker [PR #267](https://github.com/catalystneuro/roiextractors/pull/267)


# v0.5.5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,15 @@ def _determine_imaging_is_volumetric(folder_path: PathType) -> bool:
is_volumetric: bool
True if the imaging is volumetric (multiplane), False otherwise (single plane).
"""
xml_root = _parse_xml(folder_path=folder_path)
z_device_element = xml_root.find(".//PVStateValue[@key='zDevice']")
is_volumetric = bool(int(z_device_element.attrib["value"]))
folder_path = Path(folder_path)
xml_file_path = folder_path / f"{folder_path.name}.xml"
assert xml_file_path.is_file(), f"The XML configuration file is not found at '{xml_file_path}'."

is_volumetric = False
for event, elem in ElementTree.iterparse(xml_file_path, events=("start",)):
if elem.tag == "PVStateValue" and elem.attrib.get("key") == "zDevice":
is_volumetric = bool(int(elem.attrib["value"]))
break # Stop parsing as we've found the required element

return is_volumetric

Expand Down Expand Up @@ -107,25 +113,52 @@ def get_streams(cls, folder_path: PathType) -> dict:
"""
natsort = get_package(package_name="natsort", installation_instructions="pip install natsort")

xml_root = _parse_xml(folder_path=folder_path)
channel_names = [file.attrib["channelName"] for file in xml_root.findall(".//File")]
unique_channel_names = natsort.natsorted(set(channel_names))
folder_path = Path(folder_path)
xml_file_path = folder_path / f"{folder_path.name}.xml"
assert xml_file_path.is_file(), f"The XML configuration file is not found at '{folder_path}'."

channel_names = set()
channel_ids = set()
file_names = []

# Parse the XML file iteratively to find the first Sequence element
first_sequence_element = None
for _, elem in ElementTree.iterparse(xml_file_path, events=("end",)):
if elem.tag == "Sequence":
first_sequence_element = elem
break

if first_sequence_element is None:
raise ValueError("No Sequence element found in the XML configuration file. Can't get streams")

# Then in the first Sequence we find all the Frame elements
if first_sequence_element is not None:
# Iterate over all Frame elements within the first Sequence
frame_elements = first_sequence_element.findall(".//Frame")
for frame_elemenet in frame_elements:
# Iterate over all File elements within each Frame
for file_elem in frame_elemenet.findall("File"):
channel_names.add(file_elem.attrib["channelName"])
channel_ids.add(file_elem.attrib["channel"])
file_names.append(file_elem.attrib["filename"])

unique_channel_names = natsort.natsorted(channel_names)
unique_channel_ids = natsort.natsorted(channel_ids)

streams = dict(channel_streams=unique_channel_names)
streams["plane_streams"] = dict()

if not _determine_imaging_is_volumetric(folder_path=folder_path):
return streams
# The "channelName" can be any name that the experimenter sets (e.g. 'Ch1', 'Ch2', 'Green', 'Red')
# Use the identifier of a channel "channel" (e.g. 1, 2) to match it to the file name
channel_ids = [file.attrib["channel"] for file in xml_root.findall(".//File")]
unique_channel_ids = natsort.natsorted(set(channel_ids))

for channel_id, channel_name in zip(unique_channel_ids, unique_channel_names):
plane_naming_pattern = rf"(?P<stream_name>Ch{channel_id}_\d+)"
plane_stream_names = [
re.search(plane_naming_pattern, file.attrib["filename"])["stream_name"]
for file in xml_root.findall(f".//File")
]
regular_expression_matches = [re.search(plane_naming_pattern, filename) for filename in file_names]
plane_stream_names = [matches["stream_name"] for matches in regular_expression_matches if matches]

unique_plane_stream_names = natsort.natsorted(set(plane_stream_names))
streams["plane_streams"][channel_name] = unique_plane_stream_names

return streams

def __init__(
Expand Down
6 changes: 3 additions & 3 deletions tests/test_brukertiffimagingextactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ def setUpClass(cls):
cls.test_video[..., 1] = second_plane_video

def test_stream_names(self):
self.assertEqual(
BrukerTiffMultiPlaneImagingExtractor.get_streams(folder_path=self.folder_path), self.available_streams
)
found_streams = BrukerTiffMultiPlaneImagingExtractor.get_streams(folder_path=self.folder_path)
expected_streams = self.available_streams
self.assertEqual(found_streams, expected_streams)

def test_brukertiffextractor_image_size(self):
self.assertEqual(self.extractor.get_image_size(), (512, 512, 2))
Expand Down

0 comments on commit 70b629d

Please sign in to comment.