diff --git a/xnat_ingest/cli/stage.py b/xnat_ingest/cli/stage.py index c6ac8ae..d184401 100644 --- a/xnat_ingest/cli/stage.py +++ b/xnat_ingest/cli/stage.py @@ -6,7 +6,7 @@ from tqdm import tqdm from xnat_ingest.cli.base import cli from xnat_ingest.session import ImagingSession -from arcana.xnat import Xnat +from frametree.xnat import Xnat from xnat_ingest.utils import ( DicomField, AssociatedFiles, @@ -30,31 +30,76 @@ are uploaded to XNAT """, ) -@click.argument("dicoms_path", type=str, envvar="XNAT_INGEST_STAGE_DICOMS_PATH") +@click.argument("files_path", type=str, envvar="XNAT_INGEST_STAGE_DICOMS_PATH") @click.argument( "staging_dir", type=click.Path(path_type=Path), envvar="XNAT_INGEST_STAGE_DIR" ) +@click.option( + "--datatype", + type=str, + metavar="", + multiple=True, + default="medimage/dicom-series", + envvar="XNAT_INGEST_STAGE_DATATYPE", + help="The datatype of the primary files to to upload", +) @click.option( "--project-field", - type=DicomField, + type=str, default="StudyID", envvar="XNAT_INGEST_STAGE_PROJECT", - help=("The keyword or tag of the DICOM field to extract the XNAT project ID from "), + help=("The keyword of the metadata field to extract the XNAT project ID from "), ) @click.option( "--subject-field", - type=DicomField, + type=str, default="PatientID", envvar="XNAT_INGEST_STAGE_SUBJECT", - help=("The keyword or tag of the DICOM field to extract the XNAT subject ID from "), + help=("The keyword of the metadata field to extract the XNAT subject ID from "), ) @click.option( "--visit-field", - type=DicomField, + type=str, default="AccessionNumber", + envvar="XNAT_INGEST_STAGE_VISIT", + help=( + "The keyword of the metadata field to extract the XNAT imaging session ID from " + ), +) +@click.option( + "--session-field", + type=str, + default=None, envvar="XNAT_INGEST_STAGE_SESSION", help=( - "The keyword or tag of the DICOM field to extract the XNAT imaging session ID from " + "The keyword of the metadata field to extract the XNAT imaging session ID from " + ), +) +@click.option( + "--scan-id-field", + type=str, + default="SeriesNumber", + envvar="XNAT_INGEST_STAGE_SCAN_ID", + help=( + "The keyword of the metadata field to extract the XNAT imaging scan ID from " + ), +) +@click.option( + "--scan-desc-field", + type=str, + default="SeriesDescription", + envvar="XNAT_INGEST_STAGE_SCAN_DESC", + help=( + "The keyword of the metadata field to extract the XNAT imaging scan description from " + ), +) +@click.option( + "--resource-field", + type=str, + default="ImageType", + envvar="XNAT_INGEST_STAGE_RESOURCE", + help=( + "The keyword of the metadata field to extract the XNAT imaging resource ID from " ), ) @click.option( @@ -68,6 +113,7 @@ type=AssociatedFiles.cli_type, nargs=2, default=None, + multiple=True, envvar="XNAT_INGEST_STAGE_ASSOCIATED", metavar=" ", help=( @@ -181,12 +227,17 @@ type=bool, ) def stage( - dicoms_path: str, + files_path: str, staging_dir: Path, + datatype: str, associated_files: AssociatedFiles, - project_field: DicomField, - subject_field: DicomField, - visit_field: DicomField, + project_field: str, + subject_field: str, + visit_field: str, + session_field: str | None, + scan_id_field: str, + scan_desc_field: str, + resource_field: str, project_id: str | None, delete: bool, log_level: str, @@ -219,7 +270,10 @@ def stage( else: project_list = None - msg = f"Loading DICOM sessions from '{dicoms_path}'" + if session_field is None and datatype == "medimage/dicom-series": + session_field = "StudyInstanceUID" + + msg = f"Loading {datatype} sessions from '{files_path}'" if associated_files: msg += f" with associated files selected from '{associated_files.glob}'" @@ -228,17 +282,21 @@ def stage( logger.info(msg) - sessions = ImagingSession.from_dicoms( - dicoms_path=dicoms_path, + sessions = ImagingSession.from_paths( + files_path=files_path, project_field=project_field, subject_field=subject_field, visit_field=visit_field, + session_field=session_field, + scan_id_field=scan_id_field, + scan_desc_field=scan_desc_field, + resource_field=resource_field, project_id=project_id, ) logger.info("Staging sessions to '%s'", str(staging_dir)) - for session in tqdm(sessions, f"Staging DICOM sessions found in '{dicoms_path}'"): + for session in tqdm(sessions, f"Staging DICOM sessions found in '{files_path}'"): try: session_staging_dir = staging_dir.joinpath(*session.staging_relpath) if session_staging_dir.exists(): diff --git a/xnat_ingest/dicom.py b/xnat_ingest/dicom.py index 8ae6672..2708b38 100644 --- a/xnat_ingest/dicom.py +++ b/xnat_ingest/dicom.py @@ -1,18 +1,20 @@ import typing as ty import subprocess as sp + # import re import pydicom + # from fileformats.core import FileSet # from fileformats.application import Dicom # from fileformats.extras.application.medical import dicom_read_metadata - +dcmedit_path: ty.Optional[str] try: dcmedit_path = sp.check_output("which dcmedit", shell=True).decode("utf-8").strip() except sp.CalledProcessError: dcmedit_path = None - +dcminfo_path: ty.Optional[str] try: dcminfo_path = sp.check_output("which dcminfo", shell=True).decode("utf-8").strip() except sp.CalledProcessError: @@ -20,11 +22,14 @@ def tag2keyword(tag: ty.Tuple[str, str]) -> str: - return pydicom.datadict.dictionary_keyword(tag) + return pydicom.datadict.dictionary_keyword((int(tag[0]), int(tag[1]))) def keyword2tag(keyword: str) -> ty.Tuple[str, str]: - tag_str = hex(pydicom.datadict.tag_for_keyword(keyword))[2:] + tag = pydicom.datadict.tag_for_keyword(keyword) + if not tag: + raise ValueError(f"Could not find tag for keyword '{keyword}'") + tag_str = hex(tag)[2:] return (f"{tag_str[:-4].zfill(4)}", tag_str[-4:]) @@ -49,27 +54,3 @@ def __init__(self, keyword_or_tag): def __str__(self): return f"'{self.keyword}' field ({','.join(self.tag)})" - - -# @FileSet.read_metadata.register -# def mrtrix_dicom_read_metadata( -# dcm: Dicom, selected_keys: ty.Optional[ty.Sequence[str]] = None -# ) -> ty.Mapping[str, ty.Any]: -# if dcminfo_path is None or selected_keys is None: -# return dicom_read_metadata(dcm, selected_keys) - -# tags = [keyword2tag(k) for k in selected_keys] -# tag_str = " ".join(f"-t {t[0]} {t[1]}" for t in tags) -# cmd = f"dcminfo {tag_str} {dcm.fspath}" -# line_re = re.compile(r"\[([0-9A-F]{4}),([0-9A-F]{4})] (.*)") -# dcminfo_output = sp.check_output(cmd, shell=True).decode("utf-8") -# metadata = {} -# for line in dcminfo_output.splitlines(): -# match = line_re.match(line) -# if not match: -# continue -# t1, t2, val = match.groups() -# key = tag2keyword((t1, t2)) -# val = val.strip() -# metadata[key] = val -# return metadata diff --git a/xnat_ingest/exceptions.py b/xnat_ingest/exceptions.py index 9b6631a..e1b1711 100644 --- a/xnat_ingest/exceptions.py +++ b/xnat_ingest/exceptions.py @@ -1,5 +1,3 @@ - - class UnsupportedModalityError(Exception): def __init__(self, msg): self.msg = msg @@ -10,7 +8,7 @@ def __init__(self, msg): self.msg = msg -class DicomParseError(StagingError): +class ImagingSessionParseError(StagingError): def __init__(self, msg): self.msg = msg diff --git a/xnat_ingest/session.py b/xnat_ingest/session.py index df8d4e9..468522f 100644 --- a/xnat_ingest/session.py +++ b/xnat_ingest/session.py @@ -5,28 +5,29 @@ import os.path import subprocess as sp from functools import cached_property +from typing_extensions import Self import shutil from copy import deepcopy import yaml from tqdm import tqdm import attrs from itertools import chain -from collections import defaultdict +from collections import defaultdict, Counter from pathlib import Path import pydicom from fileformats.generic import File from fileformats.application import Dicom from fileformats.medimage import DicomSeries from fileformats.core import from_paths, FileSet, DataType, from_mime, to_mime -from arcana.core.data.set import Dataset -from arcana.core.data.space import DataSpace -from arcana.core.data.row import DataRow -from arcana.core.data.store import DataStore -from arcana.core.data.entry import DataEntry -from arcana.core.data.tree import DataTree -from arcana.core.exceptions import ArcanaDataMatchError -from .exceptions import DicomParseError, StagingError -from .utils import add_exc_note, transform_paths, DicomField, AssociatedFiles +from frametree.core.frameset import FrameSet # type: ignore[import-untyped] +from frametree.core.axes import Axes # type: ignore[import-untyped] +from frametree.core.row import DataRow # type: ignore[import-untyped] +from frametree.core.store import Store # type: ignore[import-untyped] +from frametree.core.entry import DataEntry # type: ignore[import-untyped] +from frametree.core.tree import DataTree # type: ignore[import-untyped] +from frametree.core.exceptions import FrameTreeDataMatchError # type: ignore[import-untyped] +from .exceptions import ImagingSessionParseError, StagingError +from .utils import add_exc_note, transform_paths, AssociatedFiles from .dicom import dcmedit_path import random import string @@ -57,6 +58,9 @@ def scans_converter( scans: ty.Union[ty.Sequence[ImagingScan], ty.Dict[str, ImagingScan]] ): if isinstance(scans, ty.Sequence): + duplicates = [i for i, c in Counter(s.id for s in scans).items() if c > 1] + if duplicates: + raise ValueError(f"Found duplicate scan IDs in list of scans: {duplicates}") scans = {s.id: s for s in scans} return scans @@ -121,14 +125,14 @@ def dicom_dirs(self) -> ty.List[Path]: def select_resources( self, - dataset: ty.Optional[Dataset], + dataset: ty.Optional[FrameSet], always_include: ty.Sequence[str] = (), ) -> ty.Iterator[ty.Tuple[str, str, str, FileSet]]: """Returns selected resources that match the columns in the dataset definition Parameters ---------- - dataset : Dataset + dataset : FrameSet Arcana dataset definition always_include : sequence[str] mime-types or "mime-like" (see https://arcanaframework.github.io/fileformats/) @@ -151,14 +155,18 @@ def select_resources( "Either 'dataset' or 'always_include' must be specified to select " f"appropriate resources to upload from {self.name} session" ) - store = MockDataStore(self) + store = MockStore(self) uploaded = set() for mime_like in always_include: if mime_like == "all": fileformat = FileSet else: - fileformat = from_mime(mime_like) + fileformat = from_mime(mime_like) # type: ignore[assignment] + if isinstance(fileformat, FileSet): + raise ValueError( + f"{mime_like!r} does not correspond to a file format ({fileformat})" + ) for scan in self.scans.values(): for resource_name, fileset in scan.resources.items(): if isinstance(fileset, fileformat): @@ -168,7 +176,7 @@ def select_resources( for column in dataset.columns.values(): try: entry = column.match_entry(store.row) - except ArcanaDataMatchError as e: + except FrameTreeDataMatchError as e: raise StagingError( f"Did not find matching entry for {column} column in {dataset} from " f"{self.name} session" @@ -213,32 +221,52 @@ def metadata(self): return collated @classmethod - def from_dicoms( + def from_paths( cls, - dicoms_path: str | Path, - project_field: DicomField = DicomField("StudyID"), - subject_field: DicomField = DicomField("PatientID"), - visit_field: DicomField = DicomField("AccessionNumber"), + files_path: str | Path, + datatypes: ty.Union[ + ty.Type[FileSet], ty.Sequence[ty.Type[FileSet]] + ] = DicomSeries, + project_field: str = "StudyID", + subject_field: str = "PatientID", + visit_field: str = "AccessionNumber", + scan_id_field: str = "SeriesNumber", + scan_desc_field: str = "SeriesDescription", + resource_field: str = "ImageType", + session_field: str | None = "StudyInstanceUID", project_id: str | None = None, - ) -> ty.List["ImagingSession"]: + ) -> ty.List[Self]: """Loads all imaging sessions from a list of DICOM files Parameters ---------- - dicoms_path : str or Path - Path to a directory containging the DICOMS to load the sessions from, or a + files_path : str or Path + Path to a directory containging the resources to load the sessions from, or a glob string that selects the paths project_field : str - the name of the DICOM field that is to be interpreted as the corresponding - XNAT project + the metadata field that contains the XNAT project ID for the imaging session, + by default "StudyID" subject_field : str - the name of the DICOM field that is to be interpreted as the corresponding - XNAT project + the metadata field that contains the XNAT subject ID for the imaging session, + by default "PatientID" visit_field : str - the name of the DICOM field that is to be interpreted as the corresponding - XNAT project + the metadata field that contains the XNAT visit ID for the imaging session, + by default "AccessionNumber" + scan_id_field: str + the metadata field that contains the XNAT scan ID for the imaging session, + by default "SeriesNumber" + scan_desc_field: str + the metadata field that contains the XNAT scan description for the imaging session, + by default "SeriesDescription" + resource_field: str + the metadata field that contains the XNAT resource ID for the imaging session, + by default "ImageType" + session_field : str, optional + the name of the metadata field that uniquely identifies the session, used + to check that the values extracted from the IDs across the DICOM scans are + consistent across DICOM files within the session, by default "StudyInstanceUID" project_id : str - Override the project ID loaded from the DICOM header (useful when invoking + Override the project ID loaded from the metadata (useful when invoking manually) Returns @@ -248,99 +276,160 @@ def from_dicoms( Raises ------ - DicomParseError + ImagingSessionParseError if values extracted from IDs across the DICOM scans are not consistent across DICOM files within the session """ - if isinstance(dicoms_path, Path) or "*" not in dicoms_path: - dicoms_path = Path(dicoms_path) - if not dicoms_path.exists(): - raise ValueError(f"Provided DICOMs path '{dicoms_path}' does not exist") - if dicoms_path.is_dir(): - dicom_fspaths = list(Path(dicoms_path).iterdir()) + if isinstance(files_path, Path) or "*" not in files_path: + files_path = Path(files_path) + if not files_path.exists(): + raise ValueError(f"Provided DICOMs path '{files_path}' does not exist") + if files_path.is_dir(): + fspaths = list(Path(files_path).iterdir()) else: - dicom_fspaths = [dicoms_path] + fspaths = [files_path] else: - dicom_fspaths = [Path(p) for p in glob(dicoms_path, recursive=True)] + fspaths = [Path(p) for p in glob(files_path, recursive=True)] + + from_paths_kwargs = {} + if datatypes is DicomSeries: + from_paths_kwargs["specific_tags"] = [ + project_field, + subject_field, + visit_field, + session_field, + scan_id_field, + scan_desc_field, + resource_field, + ] + + if not isinstance(datatypes, ty.Sequence): + datatypes = [datatypes] # Sort loaded series by StudyInstanceUID (imaging session) - logger.info("Loading DICOM series from %s", str(dicoms_path)) - dicom_sessions = defaultdict(list) - for series in from_paths( - dicom_fspaths, - DicomSeries, + logger.info(f"Loading {datatypes} from {files_path}...") + resources = from_paths( + fspaths, + *datatypes, ignore=".*", - selected_keys=[ - "SeriesNumber", - "SeriesDescription", - "StudyInstanceUID", - "SOPInstanceUID", # used in ordering the contents of the dicom series - project_field.keyword, - subject_field.keyword, - visit_field.keyword, - ], + **from_paths_kwargs, # type: ignore[arg-type] + ) + sessions: ty.Dict[ty.Tuple[str, str, str] | str, Self] = {} + multiple_sessions: ty.DefaultDict[str, ty.Set[ty.Tuple[str, str, str]]] = ( + defaultdict(set) + ) + multiple_scan_types: ty.DefaultDict[ + ty.Tuple[str, str, str, str], ty.Set[str] + ] = defaultdict(set) + multiple_resources: ty.DefaultDict[ + ty.Tuple[str, str, str, str, str], ty.Set[str] + ] = defaultdict(set) + for resource in tqdm( + resources, + "Sorting resources into XNAT tree structure...", ): - # Restrict the metadata fields that are loaded (others are ignored), - # for performance reasons - dicom_sessions[series.metadata["StudyInstanceUID"]].append(series) - - # Construct sessions from sorted series - logger.info("Searching for associated files ") - sessions = [] - for session_dicom_series in dicom_sessions.values(): - - def get_id(field): - ids = set(s.metadata.get(field.keyword) for s in session_dicom_series) - ids.discard(None) - if ids: - if len(ids) > 1: - raise DicomParseError( - f"Multiple values for '{field}' tag found across scans in session: " - f"{session_dicom_series}" - ) - id_ = next(iter(ids)) - if isinstance(id_, list): - raise DicomParseError( - f"Multiple values for '{field}' tag found within scans in session: " - f"{session_dicom_series}" + session_uid = resource.metadata[session_field] if session_field else None + + def get_id(field_type: str, field_name: str) -> str: + try: + value = resource.metadata[field_name] + except KeyError: + if session_uid and field_type in ("project", "subject", "visit"): + value = ( + "INVALID-MISSING-" + + field_type.upper() + + "-" + + "".join( + random.choices( + string.ascii_letters + string.digits, k=8 + ) + ) ) - id_ = cls.id_escape_re.sub("", id_) - else: - logger.warning( - "Did not find %s field in DICOM series %s", - field.keyword, - session_dicom_series, + raise ImagingSessionParseError( + f"Did not find '{field_name}' field in {resource}, " + "cannot uniquely identify the resource" + ) + return value + + if not project_id: + project_id = get_id("project", project_field) + subject_id = get_id("subject", subject_field) + visit_id = get_id("visit", visit_field) + scan_id = get_id("scan", scan_id_field) + scan_type = get_id("scan type", scan_desc_field) + resource_id = get_id("resource", resource_field) + + if session_uid is None: + session_uid = (project_id, subject_id, visit_id) + try: + session = sessions[session_uid] + except KeyError: + session = cls( + project_id=project_id, + subject_id=subject_id, + visit_id=visit_id, + ) + sessions[session_uid] = session + else: + if (session.project_id, session.subject_id, session.visit_id) != ( + project_id, + subject_id, + visit_id, + ): + # Record all issues with the session IDs for raising exception at the end + multiple_sessions[session_uid].add( + (project_id, subject_id, visit_id) ) - id_ = None - if not id_: - id_ = "INVALID-MISSING-ID-" + "".join( - random.choices(string.ascii_letters + string.digits, k=8) + multiple_sessions[session_uid].add( + (session.project_id, session.subject_id, session.visit_id) ) - return id_ - scans = [] - for dicom_series in session_dicom_series: - series_description = dicom_series.metadata["SeriesDescription"] - if isinstance(series_description, list): - series_description = series_description[0] - scans.append( - ImagingScan( - id=str(dicom_series.metadata["SeriesNumber"]), - type=str(series_description), - resources={"DICOM": dicom_series}, - ) + try: + scan = session.scans[scan_id] + except KeyError: + scan = ImagingScan(id=scan_id, type=scan_type, resources={}) + session.scans[scan_id] = scan + else: + if scan.type != scan_type: + # Record all issues with the scan types for raising exception at the end + multiple_scan_types[ + (project_id, subject_id, visit_id, scan_id) + ].add(scan_type) + + if resource_id in scan.resources: + multiple_resources[ + (project_id, subject_id, visit_id, scan_id, scan_type) + ].add(resource_id) + scan.resources[resource_id] = resource + + if multiple_sessions: + raise ImagingSessionParseError( + "Multiple sessions found with the same project/subject/visit ID triplets: " + + "\n".join( + f"{i} -> {p}:{s}:{v}" for i, (p, s, v) in multiple_sessions.items() ) + ) - sessions.append( - cls( - scans=scans, - project_id=(project_id if project_id else get_id(project_field)), - subject_id=get_id(subject_field), - visit_id=get_id(visit_field), + if multiple_scan_types: + raise ImagingSessionParseError( + "Multiple scans found with the same project/subject/visit/scan ID " + "quadruplets: " + + "\n".join( + f"{p}:{s}:{v}:{sc} -> " + ", ".join(st) + for (p, s, v, sc), st in multiple_scan_types.items() + ) + ) + if multiple_resources: + raise ImagingSessionParseError( + "Multiple resources found with the same project/subject/visit/scan/resource " + "ID quintuplets: " + + "\n".join( + f"{p}:{s}:{v}:{sc}:{r} -> " + ", ".join(rs) + for (p, s, v, sc, r), rs in multiple_resources.items() ) ) - return sessions + return list(sessions.values()) @classmethod def load( @@ -391,7 +480,7 @@ def load( id=scan_id, type=scan_dict["type"], resources={ - n: from_mime(d["datatype"])( + n: from_mime(d["datatype"])( # type: ignore[call-arg, misc] session_dir.joinpath(*p.split("/")) for p in d["fspaths"] ) @@ -481,7 +570,7 @@ def save(self, save_dir: Path, just_manifest: bool = False) -> "ImagingSession": ) saved.scans[scan.id].resources[resource_name] = fileset resources_dict[resource_name] = { - "datatype": to_mime(fileset, official=False), + "datatype": to_mime(type(fileset), official=False), "fspaths": [ # Ensure it is a relative path using POSIX forward slashes str(p.absolute().relative_to(session_dir)).replace("\\", "/") @@ -508,7 +597,7 @@ def stage( deidentify: bool = True, project_list: ty.Optional[ty.List[str]] = None, spaces_to_underscores: bool = False, - ) -> "ImagingSession": + ) -> Self: r"""Stages and deidentifies files by removing the fields listed `FIELDS_TO_ANONYMISE` and replacing birth date with 01/01/ and returning new imaging session @@ -567,7 +656,7 @@ def stage( for scan in tqdm( self.scans.values(), f"Staging DICOM sessions to {session_dir}" ): - staged_resources = {} + staged_resources: ty.Dict[str, FileSet] = {} for resource_name, fileset in scan.resources.items(): # Ensure scan type is a valid directory name scan_dir = session_dir / f"{scan.id}-{scan.type}" / resource_name @@ -842,7 +931,7 @@ def deidentify_dicom( @attrs.define -class MockDataStore(DataStore): +class MockStore(Store): """Mock data store so we can use the column.match_entry method on the "entries" in the data row """ @@ -853,7 +942,7 @@ class MockDataStore(DataStore): def row(self): return DataRow( ids={DummySpace._: None}, - dataset=Dataset(id=None, store=self, hierarchy=[], space=DummySpace), + dataset=FrameSet(id=None, store=self, hierarchy=[], space=DummySpace), frequency=DummySpace._, ) @@ -957,5 +1046,5 @@ def create_entry(self, path: str, datatype: type, row: DataRow) -> DataEntry: raise NotImplementedError -class DummySpace(DataSpace): +class DummySpace(Axes): _ = 0b0