From dc819d807f53d2ed899a8b869a6878a40e323305 Mon Sep 17 00:00:00 2001 From: Tom Close Date: Fri, 24 Nov 2023 13:54:24 +1100 Subject: [PATCH] changed dicoms to resources --- xnat_ingest/cli/stage.py | 10 ++- xnat_ingest/dicom.py | 115 ------------------------------ xnat_ingest/session.py | 103 +++++++++++++------------- xnat_ingest/tests/test_session.py | 2 +- 4 files changed, 60 insertions(+), 170 deletions(-) delete mode 100644 xnat_ingest/dicom.py diff --git a/xnat_ingest/cli/stage.py b/xnat_ingest/cli/stage.py index 1c44dc9..52d467f 100644 --- a/xnat_ingest/cli/stage.py +++ b/xnat_ingest/cli/stage.py @@ -93,9 +93,7 @@ default="info", type=str, envvar="XNAT_INGEST_LOGLEVEL", - help=( - "The level of the logging printed to stdout" - ) + help=("The level of the logging printed to stdout"), ) @click.option( "--log-file", @@ -153,7 +151,6 @@ def stage( mail_server: MailServer, raise_errors: bool, ): - set_logger_handling(log_level, log_file, log_emails, mail_server) logger.info( @@ -165,7 +162,6 @@ def stage( sessions = ImagingSession.construct( dicoms_path=dicoms_path, associated_files_pattern=associated, - assoc_files_identification=assoc_identification, project_field=project_field, subject_field=subject_field, session_field=session_field, @@ -186,7 +182,9 @@ def stage( continue session_staging_dir.mkdir(exist_ok=True) # Deidentify files and save them to the staging directory - staged_session = session.deidentify(session_staging_dir) + staged_session = session.stage( + session_staging_dir, assoc_files_identification=assoc_identification + ) staged_session.save(session_staging_dir) if delete: session.delete() diff --git a/xnat_ingest/dicom.py b/xnat_ingest/dicom.py deleted file mode 100644 index 0594af7..0000000 --- a/xnat_ingest/dicom.py +++ /dev/null @@ -1,115 +0,0 @@ -import typing as ty -from collections import defaultdict -from pathlib import Path -from copy import copy -import attrs - - -@attrs.define -class DicomScan: - - Tag = ty.NewType("Tag", ty.Tuple[str, str]) - - modality: str - files: list[Path] = attrs.field(factory=list) - ids: dict[str, str] = attrs.field(factory=dict) - - DEFAULT_ID_FIELDS = { - "project": "StudyID", - "subject": "PatientID", - "session": "AccessionNumber", - } - - @classmethod - def from_files( - cls, - dicom_files: ty.Sequence[Path], - ids: ty.Optional[dict[str, str]] = None, - **id_fields: dict[str, ty.Union[str, Tag, tuple[str, ty.Callable], tuple[Tag, ty.Callable]]], - ) -> "ty.Sequence[DicomScan]": - """Loads a series of DICOM scans from a list of dicom files, grouping the files - by series number and pulling various session-identifying fields from the headers - - Parameters - ---------- - dicom_files: Sequence[Path] - The dicom files to sort - ids : dict[str, str] - IDs to specifiy manually, overrides those loaded from the DICOM headers - **id_fields : dict[str, ty.Union[str, Tag, tuple[str, ty.Callable], tuple[Tag, ty.Callable]]] - The DICOM fields to extractx the IDs from. Values of the dictionary - can either be the DICOM field name or tag as a tuple (e.g. `("0001", "0008")`) - or a tuple containging the str/tag and a callable used to extract the - ID from. For regex expressions you can use the DicomScan.id_exractor method - """ - id_fields = copy(cls.DEFAULT_ID_FIELDS) - id_fields.update(id_fields) - - scans: dict[str, DicomScan] = {} - ids_dct = defaultdict(list) - subject_id_dct = defaultdict(list) - project_id_dct = defaultdict(list) - # TESTNAME_GePhantom_20230825_155050 - for dcm_file in dicom_files: - dcm = pydicom.dcmread(dcm_file) - scan_id = dcm.SeriesNumber - if "SECONDARY" in dcm.ImageType: - modality = "SC" - else: - modality = dcm.Modality - try: - scan = scans[scan_id] - except KeyError: - scan = scans[scan_id] = Scan(modality=modality) - else: - # Get scan modality (should be the same for all dicoms with the same series - # number) - assert modality == scan.modality - scan.files.append(dcm_file) - project_id_dct[dcm.get(project_field.keyword)].append(dcm_file) - subject_id_dct[dcm.get(subject_field.keyword)].append(dcm_file) - session_id_dct[dcm.get(session_field.keyword)].append(dcm_file) - errors: list[str] = [] - project_id: str = spec.get("project_id") # type: ignore - subject_id: str = spec.get("subject_id") # type: ignore - session_id: str = spec.get("session_id") # type: ignore - if project_id is None: - project_ids = list(project_id_dct) - if len(list(project_ids)) > 1: - errors.append( - f"Incosistent project IDs found in {project_field}:\n" - + json.dumps(project_id_dct, indent=4) - ) - else: - project_id = project_ids[0] - if not project_id: - logger.error(f"Project ID ({project_field}) not provided") - if subject_id is None: - subject_ids = list(subject_id_dct) - if len(subject_ids) > 1: - errors.append( - f"Incosistent subject IDs found in {subject_field}:\n" - + json.dumps(subject_id_dct, indent=4) - ) - else: - # FIXME: space is present in test data, but shouldn't be in prod - subject_id = subject_ids[0].replace(" ", "_") - if not subject_id: - errors.append(f"Subject ID ({subject_field}) not provided") - if session_id is None: - session_ids = list(session_id_dct) - if len(session_ids) > 1: - errors.append( - f"Incosistent session IDs found in {session_field}:\n" - + json.dumps(session_id_dct, indent=4) - ) - else: - session_id = session_ids[0] - if not session_id: - errors.append(f"Session ID ({session_field}) not provided") - if errors: - raise DicomParseError("\n".join(errors)) - associated_file_dir_name = "_".join(dcm.PatientName.split("^")) + "_" + dcm.StudyDate - return scans, SessionMetadata( - project_id, subject_id, session_id, associated_file_dir_name - ) diff --git a/xnat_ingest/session.py b/xnat_ingest/session.py index c4b0cb5..a67f9f1 100644 --- a/xnat_ingest/session.py +++ b/xnat_ingest/session.py @@ -14,7 +14,7 @@ import pydicom from fileformats.application import Dicom from fileformats.medimage import DicomSeries -from fileformats.core import from_paths, FileSet, DataType +from fileformats.core import from_paths, FileSet, DataType, from_mime, to_mime from fileformats.generic import File, Directory from arcana.core.data.set import Dataset from arcana.core.data.space import DataSpace @@ -29,12 +29,24 @@ logger = logging.getLogger("xnat-ingest") -def dicoms_converter( - multi_dicom_series: ty.Union[ty.List[DicomSeries], ty.Dict[str, DicomSeries]] -) -> ty.Dict[str, DicomSeries]: - if isinstance(multi_dicom_series, ty.Sequence): - multi_dicom_series = {str(s["SeriesNumber"]): s for s in multi_dicom_series} - return multi_dicom_series +def resources_converter( + resources: ty.Union[ + ty.List[DicomSeries], ty.Dict[ty.Tuple[str, str], ty.Tuple[str, FileSet]] + ] +) -> ty.Dict[ty.Tuple[str, str], ty.Tuple[str, FileSet]]: + if isinstance(resources, ty.Sequence): + resources_dict = {} + for resource in resources: + if not isinstance(resource, DicomSeries): + raise TypeError( + f"Only sequences of DicomSeries can be converted, otherwise needs " + f"to be already in a dictionary, found {resources}" + ) + resources_dict[ + (str(resource["SeriesNumber"]), "DICOM") + ] = (str(resource["SeriesDescription"]), resource) + resources = resources_dict + return resources @attrs.define(slots=False) @@ -42,9 +54,9 @@ class ImagingSession: project_id: str subject_id: str session_id: str - dicoms: ty.Dict[str, DicomSeries] = attrs.field( - factory=dict, converter=dicoms_converter - ) + resources: ty.Dict[ty.Tuple[str, str], ty.Tuple[str, FileSet]] = attrs.field( + factory=dict, converter=resources_converter + ) # keys -> scan-id & resource-type, values -> description, scan associated_files_pattern: str | None = None associated_file_fspaths: ty.List[Path] = attrs.field(factory=list) @@ -72,7 +84,6 @@ def select_resources( dataset: Dataset, include_all_dicoms: bool = False, include_all_assoc: bool = False, - assoc_id_pattern: str = None ) -> ty.Iterator[ty.Tuple[str, str, str, FileSet]]: """Returns selected resources that match the columns in the dataset definition @@ -95,7 +106,7 @@ def select_resources( scan : FileSet a fileset to upload """ - store = MockDataStore(self, assoc_id_pattern=assoc_id_pattern) + store = MockDataStore(self) uploaded: ty.Set[FileSet] = set() @@ -185,7 +196,6 @@ def construct( cls, dicoms_path: str | Path, associated_files_pattern: str | None = None, - assoc_files_identification: str | None = None, project_field: str = "StudyID", subject_field: str = "PatientID", session_field: str = "AccessionNumber", @@ -205,10 +215,6 @@ def construct( are substituted before the string is used to glob the non-DICOM files. In order to deidentify the filenames, the pattern must explicitly reference all identifiable fields in string template placeholders. - assoc_files_identification : str, optional - Used to extract the scan ID & type/resource from the associated filename. Should - be a regular-expression (Python syntax) with named groups called 'id' and 'type', e.g. - '[^\.]+\.[^\.]+\.(?P\d+)\.(?P\w+)\..*' project_field : str the name of the DICOM field that is to be interpreted as the corresponding XNAT project @@ -238,9 +244,6 @@ def construct( else: dicom_fspaths = [Path(p) for p in glob(dicoms_path)] - if assoc_files_identification: - raise NotImplementedError - # Sort loaded series by StudyInstanceUID (imaging session) logger.info("Loading DICOM series from %s", str(dicoms_path)) dicom_sessions = defaultdict(list) @@ -280,7 +283,7 @@ def get_id(field): sessions.append( cls( - dicoms={str(s["SeriesNumber"]): s for s in session_dicom_series}, + resources=session_dicom_series, associated_file_fspaths=associated_file_fspaths, associated_files_pattern=associated_files_pattern, project_id=(project_id if project_id else get_id(project_field)), @@ -312,7 +315,13 @@ def load(cls, save_dir: Path): "is a valid YAML file", ) raise e - dct["dicoms"] = {k: DicomSeries(v) for k, v in dct["dicoms"].items()} + dct["resources"] = { + (rd["scan_id"], rd["resource"]): ( + rd["description"], + from_mime(rd["datatype"])(rd["fspaths"]), + ) + for rd in dct["resources"] + } dct["associated_file_fspaths"] = [ Path(f) for f in dct["associated_file_fspaths"] ] @@ -327,11 +336,20 @@ def save(self, save_dir: Path): yaml_file : Path name of the file to load the manually specified IDs from (YAML format) """ - dct = attrs.asdict(self, recurse=True) + dct = attrs.asdict(self, recurse=False) dct["associated_file_fspaths"] = [ str(p) for p in dct["associated_file_fspaths"] ] - dct["dicoms"] = {k: [str(p) for p in v["fspaths"]] for k, v in dct["dicoms"].items()} + dct["resources"] = [ + { + "scan_id": id_, + "resource": res, + "description": desc, + "datatype": to_mime(scan, official=False), + "fspaths": [str(p) for p in scan.fspaths], + } + for (id_, res), (desc, scan) in dct["resources"].items() + ] yaml_file = save_dir / self.SAVE_FILENAME with open(yaml_file, "w") as f: yaml.dump( @@ -339,14 +357,20 @@ def save(self, save_dir: Path): f, ) - def deidentify(self, dest_dir: Path) -> "ImagingSession": - """Deidentify files by removing the fields listed `FIELDS_TO_ANONYMISE` and + def stage( + self, dest_dir: Path, assoc_files_identification: str | None = None + ) -> "ImagingSession": + """Stages and deidentifies files by removing the fields listed `FIELDS_TO_ANONYMISE` and replacing birth date with 01/01/ and returning new imaging session Parameters ---------- dest_dir : Path destination directory to save the deidentified files + assoc_files_identification : str, optional + Used to extract the scan ID & type/resource from the associated filename. Should + be a regular-expression (Python syntax) with named groups called 'id' and 'type', e.g. + '[^\.]+\.[^\.]+\.(?P\d+)\.(?P\w+)\..*' Returns ------- @@ -579,23 +603,11 @@ def populate_row(self, row: DataRow): row : DataRow The row to populate with entries """ - series_numbers = [] - for series_number, dcm in self.session.dicoms.items(): + for (scan_id, scan_type), (scan_desc, scan) in self.session.scans.items(): row.add_entry( - path=dcm["SeriesDescription"], - datatype=DicomSeries, - uri=f"dicom::{series_number}", - ) - series_numbers.append(series_number) - - collated = defaultdict(list) - for assoc_fspath in self.session.associated_file_fspaths: - - for resource in collated: - row.add_entry( - path=assoc_fspath.name, - datatype=FileSet, - uri=f"associated_file::{assoc_fspath}", + path=scan_desc, + datatype=type(scan), + uri=(scan_id, scan_type), ) def get(self, entry: DataEntry, datatype: type) -> DataType: @@ -614,12 +626,7 @@ def get(self, entry: DataEntry, datatype: type) -> DataType: item : DataType the item stored within the specified entry """ - file_category, path = entry.uri.split("::") - if file_category == "dicom": - fileset = datatype(self.session.dicoms[path]) - else: - fileset = datatype(path) - return fileset + return datatype(self.session.scans[entry.uri]) ###################################### # The following methods can be empty # diff --git a/xnat_ingest/tests/test_session.py b/xnat_ingest/tests/test_session.py index 3f8b503..5be8c82 100644 --- a/xnat_ingest/tests/test_session.py +++ b/xnat_ingest/tests/test_session.py @@ -36,7 +36,7 @@ def imaging_session() -> ImagingSession: project_id="PROJECTID", subject_id="SUBJECTID", session_id="SESSIONID", - dicoms=[ + resources=[ DicomSeries(d.iterdir()) for d in ( get_pet_image(PatientName=PatientName),