diff --git a/mriqc/cli/parser.py b/mriqc/cli/parser.py index 6d24b8fa5..302d3227b 100644 --- a/mriqc/cli/parser.py +++ b/mriqc/cli/parser.py @@ -479,7 +479,6 @@ def _bids_filter(value): def parse_args(args=None, namespace=None): """Parse args and run further checks on the command line.""" - from contextlib import suppress from json import loads from logging import DEBUG, FileHandler from pathlib import Path @@ -490,6 +489,7 @@ def parse_args(args=None, namespace=None): from mriqc import __version__ from mriqc._warnings import DATE_FMT, LOGGER_FMT, _LogFormatter from mriqc.messages import PARTICIPANT_START + from mriqc.utils.misc import initialize_meta_and_data parser = _build_parser() opts = parser.parse_args(args, namespace) @@ -554,10 +554,9 @@ def parse_args(args=None, namespace=None): if output_dir == bids_dir: parser.error( 'The selected output folder is the same as the input BIDS folder. ' - 'Please modify the output path (suggestion: %s).' - % bids_dir + f'Please modify the output path (suggestion: {bids_dir}).' / 'derivatives' - / ('mriqc-%s' % version.split('+')[0]) + / ('mriqc-{}'.format(version.split('+')[0])) ) if bids_dir in work_dir.parents: @@ -642,11 +641,7 @@ def parse_args(args=None, namespace=None): f'MRIQC is unable to process the following modalities: {", ".join(unknown_mods)}.' ) - # Estimate the biggest file size / leave 1GB if some file does not exist (datalad) - with suppress(FileNotFoundError): - config.workflow.biggest_file_gb = _get_biggest_file_size_gb( - config.workflow.inputs.values() - ) + initialize_meta_and_data() # set specifics for alternative populations if opts.species.lower() != 'human': @@ -660,17 +655,3 @@ def parse_args(args=None, namespace=None): config.workflow.fd_radius = 7.5 # block uploads for the moment; can be reversed before wider release config.execution.no_sub = True - - -def _get_biggest_file_size_gb(files): - """Identify the largest file size (allows multi-echo groups).""" - - import os - - sizes = [] - for file in files: - if isinstance(file, (list, tuple)): - sizes.append(_get_biggest_file_size_gb(file)) - else: - sizes.append(os.path.getsize(file)) - return max(sizes) / (1024**3) diff --git a/mriqc/cli/run.py b/mriqc/cli/run.py index c8839f060..cf43d4752 100644 --- a/mriqc/cli/run.py +++ b/mriqc/cli/run.py @@ -266,7 +266,10 @@ def main(argv=None): ) ), ) - config.to_filename(config.execution.log_dir / f'config-{config.execution.run_uuid}.toml') + config.to_filename( + config.execution.log_dir / f'config-{config.execution.run_uuid}.toml', + store_inputs=False, # Inputs are not necessary anymore + ) sys.exit(exitcode) diff --git a/mriqc/config.py b/mriqc/config.py index d57521815..d26968f54 100644 --- a/mriqc/config.py +++ b/mriqc/config.py @@ -91,6 +91,7 @@ from __future__ import annotations import os +import pickle import sys from contextlib import suppress from pathlib import Path @@ -576,8 +577,8 @@ class workflow(_Config): analysis_level: list[str] = ['participant'] """Level of analysis.""" - biggest_file_gb: int = 1 - """Size of largest file in GB.""" + biggest_file_gb: dict[int] = 1 + """Dictionary holding the size of largest file in GB (per modality).""" deoblique: bool = False """Deoblique the functional scans during head motion correction preprocessing.""" despike: bool = False @@ -590,6 +591,12 @@ class workflow(_Config): """Turn on FFT based spike detector (slow).""" inputs: list[str | os.PathLike] | None = None """List of files to be processed with MRIQC.""" + inputs_entities: dict[list[dict]] + """List of entities corresponding to inputs.""" + inputs_metadata: dict[list[dict | list[dict]]] | None = None + """List of metadata corresponding to inputs.""" + inputs_path: Path | None = None + """Path to a pickle file with the input paths and metadata.""" min_len_dwi: int = 7 """ Minimum DWI length to be considered a "processable" dataset @@ -602,6 +609,21 @@ class workflow(_Config): template_id: str = 'MNI152NLin2009cAsym' """TemplateFlow ID of template used for the anatomical processing.""" + _hidden: tuple[str, ...] = ('inputs', 'inputs_entities', 'inputs_metadata') + + @classmethod + def init(cls) -> None: + if cls.inputs_path is None: + cls.inputs_path = execution.work_dir / f'inputs-{execution.run_uuid}.pkl' + + if cls.inputs_path.exists(): + with open(cls.inputs_path, 'rb') as handle: + _inputs = pickle.load(handle) + + cls.inputs = _inputs['paths'] + cls.inputs_metadata = _inputs['metadata'] + cls.inputs_entities = _inputs['entities'] + class loggers: """Keep loggers easily accessible (see :py:func:`init`).""" @@ -727,7 +749,10 @@ def dumps() -> str: return dumps(get()) -def to_filename(filename: str | os.PathLike | None = None) -> Path: +def to_filename( + filename: str | os.PathLike | None = None, + store_inputs: bool = True, +) -> Path: """Write settings to file.""" if filename: @@ -738,6 +763,21 @@ def to_filename(filename: str | os.PathLike | None = None) -> Path: settings.file_path.parent.mkdir(exist_ok=True, parents=True) settings.file_path.write_text(dumps()) loggers.cli.debug(f'Saved MRIQC config file: {settings.file_path}.') + + if store_inputs: + if workflow.inputs_path is None: + workflow.inputs_path = execution.work_dir / f'inputs-{execution.run_uuid}.pkl' + + # Pickle inputs + with open(workflow.inputs_path, 'wb') as handle: + inputs_dict = { + 'paths': workflow.inputs, + 'metadata': workflow.inputs_metadata, + 'entities': workflow.inputs_entities, + } + pickle.dump(inputs_dict, handle, protocol=pickle.HIGHEST_PROTOCOL) + + loggers.cli.debug(f'Saved MRIQC inputs file: {workflow.inputs_path}.') return settings.file_path diff --git a/mriqc/interfaces/bids.py b/mriqc/interfaces/bids.py index 1d5232c00..a2de52276 100644 --- a/mriqc/interfaces/bids.py +++ b/mriqc/interfaces/bids.py @@ -42,15 +42,19 @@ class IQMFileSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec): in_file = Str(mandatory=True, desc='path of input file') - subject_id = Str(mandatory=True, desc='the subject id') modality = Str(mandatory=True, desc='the qc type') + entities = traits.Dict(desc='entities corresponding to the input') + subject_id = Str(desc='the subject id') session_id = traits.Either(None, Str, usedefault=True) task_id = traits.Either(None, Str, usedefault=True) acq_id = traits.Either(None, Str, usedefault=True) rec_id = traits.Either(None, Str, usedefault=True) run_id = traits.Either(None, traits.Int, usedefault=True) dataset = Str(desc='dataset identifier') - dismiss_entities = traits.List(['part'], usedefault=True) + dismiss_entities = traits.List( + ['datatype', 'part', 'echo', 'extension', 'suffix'], + usedefault=True, + ) metadata = traits.Dict() provenance = traits.Dict() @@ -156,7 +160,7 @@ def _run_interface(self, runtime): ) # Fill in the "bids_meta" key - id_dict = {} + id_dict = self.inputs.entities if isdefined(self.inputs.entities) else {} for comp in BIDS_COMP: comp_val = getattr(self.inputs, comp, None) if isdefined(comp_val) and comp_val is not None: diff --git a/mriqc/interfaces/webapi.py b/mriqc/interfaces/webapi.py index d50915cd1..024373056 100644 --- a/mriqc/interfaces/webapi.py +++ b/mriqc/interfaces/webapi.py @@ -20,6 +20,8 @@ # # https://www.nipreps.org/community/licensing/ # +import json + from nipype.interfaces.base import ( BaseInterfaceInputSpec, Bunch, @@ -116,6 +118,11 @@ class UploadIQMsInputSpec(BaseInterfaceInputSpec): auth_token = Str(mandatory=True, desc='authentication token') email = Str(desc='set sender email') strict = traits.Bool(False, usedefault=True, desc='crash if upload was not successful') + modality = Str( + 'undefined', + usedefault=True, + desc='override modality field if provided through metadata', + ) class UploadIQMsOutputSpec(TraitedSpec): @@ -138,11 +145,12 @@ def _run_interface(self, runtime): self._results['api_id'] = None - response = upload_qc_metrics( + response, payload = upload_qc_metrics( self.inputs.in_iqms, endpoint=self.inputs.endpoint, auth_token=self.inputs.auth_token, email=email, + modality=self.inputs.modality, ) try: @@ -151,7 +159,9 @@ def _run_interface(self, runtime): # response did not give us an ID errmsg = ( 'QC metrics upload failed to create an ID for the record ' - f'uplOADED. rEsponse from server follows: {response.text}' + f'uploaded. Response from server follows: {response.text}' + '\n\nPayload:\n' + f'{json.dumps(payload, indent=2)}' ) config.loggers.interface.warning(errmsg) @@ -159,13 +169,20 @@ def _run_interface(self, runtime): config.loggers.interface.info(messages.QC_UPLOAD_COMPLETE) return runtime - errmsg = 'QC metrics failed to upload. Status %d: %s' % ( - response.status_code, - response.text, + errmsg = '\n'.join( + [ + 'Unsuccessful upload.', + f'Server response status {response.status_code}:', + response.text, + '', + '', + 'Payload:', + json.dumps(payload, indent=2), + ] ) config.loggers.interface.warning(errmsg) if self.inputs.strict: - raise RuntimeError(response.text) + raise RuntimeError(errmsg) return runtime @@ -175,6 +192,7 @@ def upload_qc_metrics( endpoint=None, email=None, auth_token=None, + modality=None, ): """ Upload qc metrics to remote repository. @@ -205,33 +223,43 @@ def upload_qc_metrics( # Extract metadata and provenance meta = in_data.pop('bids_meta') - - # For compatibility with WebAPI. Should be rolled back to int - if meta.get('run_id', None) is not None: - meta['run_id'] = '%d' % meta.get('run_id') - prov = in_data.pop('provenance') # At this point, data should contain only IQMs data = deepcopy(in_data) # Check modality - modality = meta.get('modality', 'None') + modality = meta.get('modality', None) or meta.get('suffix', None) or modality if modality not in ('T1w', 'bold', 'T2w'): errmsg = ( 'Submitting to MRIQCWebAPI: image modality should be "bold", "T1w", or "T2w", ' - '(found "%s")' % modality + f'(found "{modality}")' ) return Bunch(status_code=1, text=errmsg) # Filter metadata values that aren't in whitelist data['bids_meta'] = {k: meta[k] for k in META_WHITELIST if k in meta} + + # Check for fields with appended _id + bids_meta_names = {k: k.replace('_id', '') for k in META_WHITELIST if k.endswith('_id')} + data['bids_meta'].update({k: meta[v] for k, v in bids_meta_names.items() if v in meta}) + + # For compatibility with WebAPI. Should be rolled back to int + if (run_id := data['bids_meta'].get('run_id', None)) is not None: + data['bids_meta']['run_id'] = f'{run_id}' + + # One more chance for spelled-out BIDS entity acquisition + if (acq_id := meta.get('acquisition', None)) is not None: + data['bids_meta']['acq_id'] = acq_id + # Filter provenance values that aren't in whitelist data['provenance'] = {k: prov[k] for k in PROV_WHITELIST if k in prov} # Hash fields that may contain personal information data['bids_meta'] = _hashfields(data['bids_meta']) + data['bids_meta']['modality'] = modality + if email: data['provenance']['email'] = email @@ -248,10 +276,10 @@ def upload_qc_metrics( timeout=15, ) except requests.ConnectionError as err: - errmsg = 'QC metrics failed to upload due to connection error shown below:\n%s' % err + errmsg = f'QC metrics failed to upload due to connection error shown below:\n{err}' return Bunch(status_code=1, text=errmsg) - return response + return response, data def _hashfields(data): diff --git a/mriqc/utils/misc.py b/mriqc/utils/misc.py index 269b9b670..4c4bf4033 100644 --- a/mriqc/utils/misc.py +++ b/mriqc/utils/misc.py @@ -22,11 +22,19 @@ # """Helper functions.""" +from __future__ import annotations + +import asyncio import json from collections import OrderedDict from collections.abc import Iterable +from functools import partial +from os import cpu_count from pathlib import Path +from typing import Callable, TypeVar +import nibabel as nb +import numpy as np import pandas as pd try: @@ -34,6 +42,8 @@ except ImportError: from collections.abc import MutableMapping +R = TypeVar('R') + IMTYPES = { 'T1w': 'anat', 'T2w': 'anat', @@ -59,6 +69,12 @@ """ +async def worker(job: Callable[[], R], semaphore) -> R: + async with semaphore: + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, job) + + def reorder_csv(csv_file, out_file=None): """ Put subject, session and scan in front of csv file @@ -168,7 +184,7 @@ def generate_pred(derivatives_dir, output_dir, mod): # Drop duplicates dataframe.drop_duplicates(bdits_cols, keep='last', inplace=True) - out_csv = Path(output_dir) / ('%s_predicted_qa_csv' % mod) + out_csv = Path(output_dir) / f'{mod}_predicted_qa_csv' dataframe[bdits_cols + ['mriqc_pred']].to_csv(str(out_csv), index=False) return out_csv @@ -179,7 +195,7 @@ def generate_tsv(output_dir, mod): """ # If some were found, generate the CSV file and group report - out_tsv = output_dir / ('group_%s.tsv' % mod) + out_tsv = output_dir / (f'group_{mod}.tsv') jsonfiles = list(output_dir.glob(f'sub-*/**/{IMTYPES[mod]}/sub-*_{mod}.json')) if not jsonfiles: return None, out_tsv @@ -249,7 +265,7 @@ def _flatten_list(xs): def _datalad_get(input_list, nprocs=None): from mriqc import config - if not config.execution.bids_dir_datalad: + if not config.execution.bids_dir_datalad or not config.execution.datalad_get: return # Delay datalad import until we're sure we'll need it @@ -273,3 +289,267 @@ def _datalad_get(input_list, nprocs=None): config.nipype.nprocs, ), ) + + +def _file_meta_and_size( + files: list | str, + volmin: int | None = 1, + volmax: int | None = None, +): + """ + Identify the largest file size (allows multi-echo groups). + + Parameters + ---------- + files : :obj:`list` + List of :obj:`os.pathlike` or sublist of :obj:`os.pathlike` (multi-echo case) + of files to be extracted. + volmin : :obj:`int` + Minimum number of volumes that inputs must have. + volmax : :obj:`int` + Maximum number of volumes that inputs must have. + + Returns + ------- + :obj:`tuple` + A tuple (metadata, entities, sizes, valid) of items containing the different + aspects extracted from the input(s). + + """ + + import os + + from mriqc import config + + multifile = isinstance(files, (list, tuple)) + if multifile: + metadata = [] + _bids_list = [] + _size_list = [] + _valid_list = [] + + for filename in files: + metadata_i, entities_i, sizes_i, valid_i = _file_meta_and_size( + filename, + volmin=volmin, + volmax=volmax, + ) + + # Add to output lists + metadata.append(metadata_i) + _bids_list.append(entities_i) + _size_list.append(sizes_i) + _valid_list.append(valid_i) + + valid = all(_valid_list) and len({_m['NumberOfVolumes'] for _m in metadata}) == 1 + return metadata, _merge_entities(_bids_list), np.sum(_size_list), valid + + metadata = config.execution.layout.get_metadata(files) + entities = config.execution.layout.parse_file_entities(files) + size = os.path.getsize(files) / (1024**3) + + metadata['FileSize'] = size + metadata['FileSizeUnits'] = 'GB' + + try: + nii = nb.load(files) + nifti_len = nii.shape[3] + except nb.filebasedimages.ImageFileError: + nifti_len = None + except IndexError: # shape has only 3 elements + nifti_len = 1 if nii.dataobj.ndim == 3 else -1 + + valid = True + if volmin is not None: + valid = nifti_len >= volmin + + if valid and volmax is not None: + valid = nifti_len <= volmax + + metadata['NumberOfVolumes'] = nifti_len + + return metadata, entities, size, valid + + +async def _extract_meta_and_size( + filelist: list, + volmin: int | None = 1, + volmax: int | None = None, + max_concurrent: int = min(cpu_count(), 12), +) -> tuple[list, list, list, list]: + """ + Extract corresponding metadata and file size in GB. + + Parameters + ---------- + filelist : :obj:`list` + List of :obj:`os.pathlike` or sublist of :obj:`os.pathlike` (multi-echo case) + of files to be extracted. + volmin : :obj:`int` + Minimum number of volumes that inputs must have. + volmax : :obj:`int` + Maximum number of volumes that inputs must have. + max_concurrent : :obj:`int` + Maximum number of concurrent coroutines (files or multi-echo sets). + + Returns + ------- + :obj:`tuple` + A tuple (metadata, entities, sizes, valid) of lists containing the different + aspects extracted from inputs. + + """ + + semaphore = asyncio.Semaphore(max_concurrent) + tasks = [] + for filename in filelist: + tasks.append( + asyncio.create_task( + worker( + partial( + _file_meta_and_size, + filename, + volmin=volmin, + volmax=volmax, + ), + semaphore, + ) + ) + ) + + # Gather guarantees the order of the output + metadata, entities, sizes, valid = list(zip(*await asyncio.gather(*tasks))) + return metadata, entities, sizes, valid + + +def initialize_meta_and_data( + max_concurrent: int = min(cpu_count(), 12), +) -> None: + """ + Mine data and metadata corresponding to the dataset. + + Get files if datalad enabled and extract the necessary metadata. + + Parameters + ---------- + max_concurrent : :obj:`int` + Maximum number of concurrent coroutines (files or multi-echo sets). + + Returns + ------- + :obj:`None` + + """ + from mriqc import config + + # Datalad-get all files + dataset = config.workflow.inputs.values() + _datalad_get(dataset) + + # Extract metadata and filesize + config.workflow.inputs_metadata = {} + config.workflow.inputs_entities = {} + config.workflow.biggest_file_gb = {} + for mod, input_list in config.workflow.inputs.items(): + config.loggers.cli.log( + 25, + f'Extracting metadata and entities for {len(input_list)} input runs ' + f"of modality '{mod}'...", + ) + + # Some modalities require a minimum number of volumes + volmin = None + if mod == 'bold': + volmin = config.workflow.min_len_bold + elif mod == 'dwi': + volmin = config.workflow.min_len_dwi + + # Some modalities require a maximum number of volumes + volmax = None + if mod in ('T1w', 'T2w'): + volmax = 1 + + # Run extraction in a asyncio coroutine loop + metadata, entities, size, valid = asyncio.run( + _extract_meta_and_size( + input_list, + max_concurrent=max_concurrent, + volmin=volmin, + volmax=volmax, + ) + ) + + # Identify nonconformant files that need to be dropped (and drop them) + if num_dropped := len(input_list) - np.sum(valid): + config.loggers.workflow.warn( + f'{num_dropped} cannot be processed (too short or too long)' + ) + + filtered_results = [ + _v[:-1] + for _v in zip(input_list, metadata, entities, size, valid) + if _v[-1] is True + ] + input_list, metadata, entities, size = list(zip(*filtered_results)) + config.workflow.inputs[mod] = input_list + + # Finalizing (write to config so that values are propagated) + _max_size = np.max(size) + config.workflow.inputs_metadata[mod] = metadata + config.workflow.inputs_entities[mod] = entities + config.workflow.biggest_file_gb[mod] = float(_max_size) # Cast required to store YAML + + config.loggers.cli.log( + 25, + f"File size ('{mod}'): {_max_size:.2f}|{np.mean(size):.2f} " 'GB [maximum|average].', + ) + + +def _merge_entities( + entities: list, +) -> dict: + """ + Merge a list of dictionaries with entities dropping those with nonuniform values. + + Examples + -------- + >>> _merge_entities([ + ... {'subject': '001', 'session': '001'}, + ... {'subject': '001', 'session': '002'}, + ... ]) + {'subject': '001'} + + >>> _merge_entities([ + ... {'subject': '001', 'session': '002'}, + ... {'subject': '001', 'session': '002'}, + ... ]) + {'subject': '001', 'session': '002'} + + >>> _merge_entities([ + ... {'subject': '001', 'session': '002'}, + ... {'subject': '001', 'session': '002', 'run': 1}, + ... ]) + {'subject': '001', 'session': '002'} + + >>> _merge_entities([ + ... {'subject': '001', 'session': '002'}, + ... {'subject': '001', 'run': 1}, + ... ]) + {'subject': '001'} + + """ + out_entities = {} + + bids_keys = set(entities[0].keys()) + for entities_i in entities[1:]: + bids_keys.intersection_update(entities_i.keys()) + + # Preserve ordering + bids_keys = [_b for _b in entities[0].keys() if _b in bids_keys] + + for key in bids_keys: + values = {_entities[key] for _entities in entities} + if len(values) == 1: + out_entities[key] = values.pop() + + return out_entities diff --git a/mriqc/workflows/anatomical/base.py b/mriqc/workflows/anatomical/base.py index 74c5fc656..a697a7b4d 100644 --- a/mriqc/workflows/anatomical/base.py +++ b/mriqc/workflows/anatomical/base.py @@ -52,6 +52,8 @@ """ +from itertools import chain + from nipype.interfaces import utility as niu from nipype.pipeline import engine as pe from niworkflows.interfaces.fixes import FixHeaderApplyTransforms as ApplyTransforms @@ -88,30 +90,52 @@ def anat_qc_workflow(name='anatMRIQC'): """ from mriqc.workflows.shared import synthstrip_wf - dataset = config.workflow.inputs.get('t1w', []) + config.workflow.inputs.get('t2w', []) - + # Enable if necessary + # mem_gb = max( + # config.workflow.biggest_file_gb['t1w'], + # config.workflow.biggest_file_gb['t2w'], + # ) + dataset = list( + chain( + config.workflow.inputs.get('t1w', []), + config.workflow.inputs.get('t2w', []), + ) + ) + metadata = list( + chain( + config.workflow.inputs_metadata.get('t1w', []), + config.workflow.inputs_metadata.get('t2w', []), + ) + ) + entities = list( + chain( + config.workflow.inputs_entities.get('t1w', []), + config.workflow.inputs_entities.get('t2w', []), + ) + ) message = BUILDING_WORKFLOW.format( modality='anatomical', - detail=( - f'for {len(dataset)} NIfTI files.' - if len(dataset) > 2 - else f"({' and '.join('<%s>' % v for v in dataset)})." - ), + detail=f'for {len(dataset)} NIfTI files.', ) config.loggers.workflow.info(message) - if config.execution.datalad_get: - from mriqc.utils.misc import _datalad_get - - _datalad_get(dataset) - # Initialize workflow workflow = pe.Workflow(name=name) # Define workflow, inputs and outputs # 0. Get data - inputnode = pe.Node(niu.IdentityInterface(fields=['in_file']), name='inputnode') - inputnode.iterables = [('in_file', dataset)] + inputnode = pe.Node( + niu.IdentityInterface( + fields=['in_file', 'metadata', 'entities'], + ), + name='inputnode', + ) + inputnode.synchronize = True # Do not test combinations of iterables + inputnode.iterables = [ + ('in_file', dataset), + ('metadata', metadata), + ('entities', entities), + ] outputnode = pe.Node(niu.IdentityInterface(fields=['out_json']), name='outputnode') @@ -146,7 +170,9 @@ def anat_qc_workflow(name='anatMRIQC'): ('in_file', 'inputnode.name_source'), ]), (inputnode, to_ras, [('in_file', 'in_file')]), - (inputnode, iqmswf, [('in_file', 'inputnode.in_file')]), + (inputnode, iqmswf, [('in_file', 'inputnode.in_file'), + ('metadata', 'inputnode.metadata'), + ('entities', 'inputnode.entities')]), (inputnode, norm, [(('in_file', _get_mod), 'inputnode.modality')]), (to_ras, skull_stripping, [('out_file', 'inputnode.in_files')]), (skull_stripping, hmsk, [ @@ -403,7 +429,6 @@ def compute_iqms(name='ComputeIQMs'): wf = compute_iqms() """ - from niworkflows.interfaces.bids import ReadSidecarJSON from mriqc.interfaces.anatomical import Harmonize from mriqc.workflows.utils import _tofloat @@ -413,6 +438,8 @@ def compute_iqms(name='ComputeIQMs'): niu.IdentityInterface( fields=[ 'in_file', + 'metadata', + 'entities', 'in_ras', 'brainmask', 'airmask', @@ -424,7 +451,6 @@ def compute_iqms(name='ComputeIQMs'): 'inu_corrected', 'in_inu', 'pvms', - 'metadata', 'std_tpms', ] ), @@ -435,9 +461,6 @@ def compute_iqms(name='ComputeIQMs'): name='outputnode', ) - # Extract metadata - meta = pe.Node(ReadSidecarJSON(index_db=config.execution.bids_database_dir), name='metadata') - # Add provenance addprov = pe.Node(AddProvenance(), name='provenance', run_without_submitting=True) @@ -472,17 +495,12 @@ def _getwm(inlist): # fmt: off workflow.connect([ - (inputnode, meta, [('in_file', 'in_file')]), - (inputnode, datasink, [('in_file', 'in_file'), - (('in_file', _get_mod), 'modality')]), + (inputnode, datasink, [ + ('in_file', 'in_file'), + (('in_file', _get_mod), 'modality'), + ('metadata', 'metadata'), + ('entities', 'entities')]), (inputnode, addprov, [(('in_file', _get_mod), 'modality')]), - (meta, datasink, [('subject', 'subject_id'), - ('session', 'session_id'), - ('task', 'task_id'), - ('acquisition', 'acq_id'), - ('reconstruction', 'rec_id'), - ('run', 'run_id'), - ('out_dict', 'metadata')]), (inputnode, addprov, [('in_file', 'in_file'), ('airmask', 'air_msk'), ('rotmask', 'rot_msk')]), diff --git a/mriqc/workflows/diffusion/base.py b/mriqc/workflows/diffusion/base.py index 11573409a..2155b74a1 100644 --- a/mriqc/workflows/diffusion/base.py +++ b/mriqc/workflows/diffusion/base.py @@ -43,9 +43,6 @@ This workflow is orchestrated by :py:func:`dmri_qc_workflow`. """ -from pathlib import Path - -import numpy as np from nipype.interfaces import utility as niu from nipype.pipeline import engine as pe @@ -87,45 +84,32 @@ def dmri_qc_workflow(name='dwiMRIQC'): from mriqc.messages import BUILDING_WORKFLOW from mriqc.workflows.shared import synthstrip_wf as dmri_bmsk_workflow - workflow = pe.Workflow(name=name) - - dataset = config.workflow.inputs.get('dwi', []) - - full_data = [] - - for dwi_path in dataset: - bval = config.execution.layout.get_bval(dwi_path) - if bval and Path(bval).exists() and len(np.loadtxt(bval)) > config.workflow.min_len_dwi: - full_data.append(dwi_path) - else: - config.loggers.workflow.warn( - f'Dismissing {dwi_path} for processing. b-values are missing or ' - 'insufficient in number to execute the workflow.' - ) - - if set(dataset) - set(full_data): - config.workflow.inputs['dwi'] = full_data - config.to_filename() - + # Enable if necessary + # mem_gb = config.workflow.biggest_file_gb['dwi'] + dataset = config.workflow.inputs['dwi'] + metadata = config.workflow.inputs_metadata['dwi'] + entities = config.workflow.inputs_entities['dwi'] message = BUILDING_WORKFLOW.format( modality='diffusion', - detail=( - f'for {len(full_data)} NIfTI files.' - if len(full_data) > 2 - else f"({' and '.join('<%s>' % v for v in full_data)})." - ), + detail=f'for {len(dataset)} NIfTI files.', ) config.loggers.workflow.info(message) - if config.execution.datalad_get: - from mriqc.utils.misc import _datalad_get - - _datalad_get(full_data) - # Define workflow, inputs and outputs # 0. Get data, put it in RAS orientation - inputnode = pe.Node(niu.IdentityInterface(fields=['in_file']), name='inputnode') - inputnode.iterables = [('in_file', full_data)] + workflow = pe.Workflow(name=name) + inputnode = pe.Node( + niu.IdentityInterface( + fields=['in_file', 'metadata', 'entities'], + ), + name='inputnode', + ) + inputnode.synchronize = True # Do not test combinations of iterables + inputnode.iterables = [ + ('in_file', dataset), + ('metadata', metadata), + ('entities', entities), + ] sanitize = pe.Node( SanitizeImage( @@ -245,7 +229,11 @@ def dmri_qc_workflow(name='dwiMRIQC'): (inputnode, dwi_report_wf, [ ('in_file', 'inputnode.name_source'), ]), - (inputnode, iqms_wf, [('in_file', 'inputnode.in_file')]), + (inputnode, iqms_wf, [ + ('in_file', 'inputnode.in_file'), + ('metadata', 'inputnode.metadata'), + ('entities', 'inputnode.entities'), + ]), (inputnode, sanitize, [('in_file', 'in_file')]), (sanitize, dwi_ref, [('out_file', 'in_file')]), (sanitize, sp_mask, [('out_file', 'in_file')]), @@ -335,7 +323,6 @@ def compute_iqms(name='ComputeIQMs'): wf = compute_iqms() """ - from niworkflows.interfaces.bids import ReadSidecarJSON from mriqc.interfaces import IQMFileSink from mriqc.interfaces.diffusion import DiffusionQC @@ -348,6 +335,8 @@ def compute_iqms(name='ComputeIQMs'): niu.IdentityInterface( fields=[ 'in_file', + 'metadata', + 'entities', 'in_shells', 'n_shells', 'b_values_file', @@ -377,7 +366,6 @@ def compute_iqms(name='ComputeIQMs'): niu.IdentityInterface( fields=[ 'out_file', - 'meta_sidecar', 'noise_floor', ] ), @@ -389,8 +377,6 @@ def compute_iqms(name='ComputeIQMs'): name='estimate_sigma', ) - meta = pe.Node(ReadSidecarJSON(index_db=config.execution.bids_database_dir), name='metadata') - measures = pe.Node(DiffusionQC(), name='measures') addprov = pe.Node( @@ -413,10 +399,11 @@ def compute_iqms(name='ComputeIQMs'): # fmt: off workflow.connect([ (inputnode, datasink, [('in_file', 'in_file'), + ('entities', 'entities'), + (('metadata', _filter_metadata), 'metadata'), ('n_shells', 'NumberOfShells'), ('b_values_shells', 'bValuesEstimation'), (('b_values_file', _bvals_report), 'bValues')]), - (inputnode, meta, [('in_file', 'in_file')]), (inputnode, measures, [('in_file', 'in_file'), ('b_values_file', 'in_bval_file'), ('b_values_shells', 'in_shells_bval'), @@ -439,15 +426,7 @@ def compute_iqms(name='ComputeIQMs'): ('piesno_sigma', 'piesno_sigma')]), (inputnode, addprov, [('in_file', 'in_file')]), (addprov, datasink, [('out_prov', 'provenance')]), - (meta, datasink, [('subject', 'subject_id'), - ('session', 'session_id'), - ('task', 'task_id'), - ('acquisition', 'acq_id'), - ('reconstruction', 'rec_id'), - ('run', 'run_id'), - (('out_dict', _filter_metadata), 'metadata')]), (datasink, outputnode, [('out_file', 'out_file')]), - (meta, outputnode, [('out_dict', 'meta_sidecar')]), (measures, datasink, [('out_qc', 'root')]), (inputnode, estimate_sigma, [('in_noise', 'in_file'), ('brain_mask', 'mask')]), @@ -676,23 +655,23 @@ def epi_mni_align(name='SpatialNormalization'): def _mean(inlist): - import numpy as np + from numpy import mean - return np.mean(inlist) + return mean(inlist) def _parse_tqual(in_file): - import numpy as np + from numpy import mean with open(in_file) as fin: lines = fin.readlines() - return np.mean([float(line.strip()) for line in lines if not line.startswith('++')]) + return mean([float(line.strip()) for line in lines if not line.startswith('++')]) def _parse_tout(in_file): - import numpy as np + from numpy import loadtxt - data = np.loadtxt(in_file) # pylint: disable=no-member + data = loadtxt(in_file) # pylint: disable=no-member return data.mean() @@ -701,9 +680,9 @@ def _tolist(value): def _get_bvals(bmatrix): - import numpy as np + from numpy import squeeze - return np.squeeze(bmatrix[:, -1]).tolist() + return squeeze(bmatrix[:, -1]).tolist() def _first(inlist): @@ -722,11 +701,11 @@ def _all_but_first(inlist): def _estimate_sigma(in_file, mask): import nibabel as nb - import numpy as np + from numpy import median msk = nb.load(mask).get_fdata() > 0.5 return round( - float(np.median(nb.load(in_file).get_fdata()[msk])), + float(median(nb.load(in_file).get_fdata()[msk])), 6, ) diff --git a/mriqc/workflows/functional/base.py b/mriqc/workflows/functional/base.py index ec26cda9f..b55d78b69 100644 --- a/mriqc/workflows/functional/base.py +++ b/mriqc/workflows/functional/base.py @@ -43,9 +43,6 @@ This workflow is orchestrated by :py:func:`fmri_qc_workflow`. """ -from collections.abc import Iterable - -import nibabel as nb from nipype.interfaces import utility as niu from nipype.pipeline import engine as pe from niworkflows.utils.connections import pop_file as _pop @@ -69,75 +66,43 @@ def fmri_qc_workflow(name='funcMRIQC'): """ from nipype.algorithms.confounds import TSNR, NonSteadyStateDetector from nipype.interfaces.afni import TStat - from niworkflows.interfaces.bids import ReadSidecarJSON from niworkflows.interfaces.header import SanitizeImage from mriqc.interfaces.functional import SelectEcho from mriqc.messages import BUILDING_WORKFLOW - from mriqc.utils.misc import _flatten_list as flatten - workflow = pe.Workflow(name=name) - - mem_gb = config.workflow.biggest_file_gb - - dataset = config.workflow.inputs.get('bold', []) - - if config.execution.datalad_get: - from mriqc.utils.misc import _datalad_get - - _datalad_get(dataset) - - full_files = [] - for bold_path in dataset: - try: - bold_len = nb.load( - bold_path[0] - if isinstance(bold_path, Iterable) and not isinstance(bold_path, (str, bytes)) - else bold_path - ).shape[3] - except nb.filebasedimages.ImageFileError: - bold_len = config.workflow.min_len_bold - except IndexError: # shape has only 3 elements - bold_len = 0 - if bold_len >= config.workflow.min_len_bold: - full_files.append(bold_path) - else: - config.loggers.workflow.warn( - f'Dismissing {bold_path} for processing: insufficient number of ' - f'timepoints ({bold_len}) to execute the workflow.' - ) + mem_gb = config.workflow.biggest_file_gb['bold'] + dataset = config.workflow.inputs['bold'] + metadata = config.workflow.inputs_metadata['bold'] + entities = config.workflow.inputs_entities['bold'] message = BUILDING_WORKFLOW.format( modality='functional', - detail=( - f'for {len(full_files)} BOLD runs.' - if len(full_files) > 2 - else f"({' and '.join('<%s>' % v for v in dataset)})." - ), + detail=f'for {len(dataset)} BOLD runs.', ) config.loggers.workflow.info(message) - if set(flatten(dataset)) - set(flatten(full_files)): - config.workflow.inputs['bold'] = full_files - config.to_filename() - # Define workflow, inputs and outputs # 0. Get data, put it in RAS orientation - inputnode = pe.Node(niu.IdentityInterface(fields=['in_file']), name='inputnode') - inputnode.iterables = [('in_file', full_files)] + workflow = pe.Workflow(name=name) + inputnode = pe.Node( + niu.IdentityInterface( + fields=['in_file', 'metadata', 'entities'], + ), + name='inputnode', + ) + inputnode.synchronize = True # Do not test combinations of iterables + inputnode.iterables = [ + ('in_file', dataset), + ('metadata', metadata), + ('entities', entities), + ] outputnode = pe.Node( niu.IdentityInterface(fields=['qc', 'mosaic', 'out_group', 'out_dvars', 'out_fd']), name='outputnode', ) - # Get metadata - meta = pe.MapNode( - ReadSidecarJSON(index_db=config.execution.bids_database_dir), - name='metadata', - iterfield=['in_file'], - ) - pick_echo = pe.Node(SelectEcho(), name='pick_echo') non_steady_state_detector = pe.Node(NonSteadyStateDetector(), name='non_steady_state_detector') @@ -184,10 +149,9 @@ def fmri_qc_workflow(name='funcMRIQC'): # fmt: off workflow.connect([ - (inputnode, meta, [('in_file', 'in_file')]), - (inputnode, pick_echo, [('in_file', 'in_files')]), + (inputnode, pick_echo, [('in_file', 'in_files'), + ('metadata', 'metadata')]), (inputnode, sanitize, [('in_file', 'in_file')]), - (meta, pick_echo, [('out_dict', 'metadata')]), (pick_echo, non_steady_state_detector, [('out_file', 'in_file')]), (non_steady_state_detector, sanitize, [('n_volumes_to_discard', 'n_volumes_to_discard')]), (sanitize, hmcwf, [('out_file', 'inputnode.in_file')]), @@ -195,14 +159,9 @@ def fmri_qc_workflow(name='funcMRIQC'): (hmcwf, tsnr, [('outputnode.out_file', 'in_file')]), (mean, ema, [(('out_file', _pop), 'inputnode.epi_mean')]), # Feed IQMs computation - (meta, iqmswf, [('out_dict', 'inputnode.metadata'), - ('subject', 'inputnode.subject'), - ('session', 'inputnode.session'), - ('task', 'inputnode.task'), - ('acquisition', 'inputnode.acquisition'), - ('reconstruction', 'inputnode.reconstruction'), - ('run', 'inputnode.run')]), - (inputnode, iqmswf, [('in_file', 'inputnode.in_file')]), + (inputnode, iqmswf, [('in_file', 'inputnode.in_file'), + ('metadata', 'inputnode.metadata'), + ('entities', 'inputnode.entities')]), (sanitize, iqmswf, [('out_file', 'inputnode.in_ras')]), (mean, iqmswf, [('out_file', 'inputnode.epi_mean')]), (hmcwf, iqmswf, [('outputnode.out_file', 'inputnode.hmc_epi'), @@ -213,6 +172,7 @@ def fmri_qc_workflow(name='funcMRIQC'): # Feed reportlet generation (inputnode, func_report_wf, [ ('in_file', 'inputnode.name_source'), + ('metadata', 'inputnode.meta_sidecar'), ]), (sanitize, func_report_wf, [('out_file', 'inputnode.in_ras')]), (mean, func_report_wf, [('out_file', 'inputnode.epi_mean')]), @@ -230,7 +190,6 @@ def fmri_qc_workflow(name='funcMRIQC'): ('outputnode.out_dvars', 'inputnode.in_dvars'), ('outputnode.outliers', 'inputnode.outliers'), ]), - (meta, func_report_wf, [('out_dict', 'inputnode.meta_sidecar')]), (hmcwf, outputnode, [('outputnode.out_fd', 'out_fd')]), ]) # fmt: on @@ -321,13 +280,15 @@ def compute_iqms(name='ComputeIQMs'): from mriqc.interfaces.transitional import GCOR from mriqc.workflows.utils import _tofloat, get_fwhmx - mem_gb = config.workflow.biggest_file_gb + mem_gb = config.workflow.biggest_file_gb['bold'] workflow = pe.Workflow(name=name) inputnode = pe.Node( niu.IdentityInterface( fields=[ 'in_file', + 'metadata', + 'entities', 'in_ras', 'epi_mean', 'brainmask', @@ -335,15 +296,8 @@ def compute_iqms(name='ComputeIQMs'): 'hmc_fd', 'fd_thres', 'in_tsnr', - 'metadata', 'mpars', 'exclude_index', - 'subject', - 'session', - 'task', - 'acquisition', - 'reconstruction', - 'run', ] ), name='inputnode', @@ -467,12 +421,7 @@ def compute_iqms(name='ComputeIQMs'): (inputnode, addprov, [('in_file', 'in_file')]), (inputnode, datasink, [('in_file', 'in_file'), ('exclude_index', 'dummy_trs'), - (('subject', _pop), 'subject_id'), - (('session', _pop), 'session_id'), - (('task', _pop), 'task_id'), - (('acquisition', _pop), 'acq_id'), - (('reconstruction', _pop), 'rec_id'), - (('run', _pop), 'run_id'), + ('entities', 'entities'), ('metadata', 'metadata')]), (addprov, datasink, [('out_prov', 'provenance')]), (outliers, datasink, [(('out_file', _parse_tout), 'aor')]), @@ -557,7 +506,7 @@ def hmc(name='fMRI_HMC', omp_nthreads=None): from nipype.algorithms.confounds import FramewiseDisplacement from nipype.interfaces.afni import Despike, Refit, Volreg - mem_gb = config.workflow.biggest_file_gb + mem_gb = config.workflow.biggest_file_gb['bold'] workflow = pe.Workflow(name=name) diff --git a/mriqc/workflows/functional/output.py b/mriqc/workflows/functional/output.py index 9e589aef3..427b76830 100644 --- a/mriqc/workflows/functional/output.py +++ b/mriqc/workflows/functional/output.py @@ -49,7 +49,7 @@ def init_func_report_wf(name='func_report_wf'): # from mriqc.interfaces.reports import IndividualReport verbose = config.execution.verbose_reports - mem_gb = config.workflow.biggest_file_gb + mem_gb = config.workflow.biggest_file_gb['bold'] reportlets_dir = config.execution.work_dir / 'reportlets' workflow = pe.Workflow(name=name)