From ef40015f5d91fb1a0f5a5589cd09173ce72f86ba Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Sat, 10 Aug 2024 11:25:15 +0200 Subject: [PATCH] enh: write a parallelized routine to extract file metadata and test file length --- mriqc/cli/parser.py | 21 +-- mriqc/utils/misc.py | 241 ++++++++++++++++++++++++++- mriqc/workflows/functional/base.py | 41 +---- mriqc/workflows/functional/output.py | 2 +- 4 files changed, 247 insertions(+), 58 deletions(-) diff --git a/mriqc/cli/parser.py b/mriqc/cli/parser.py index 6d24b8fa5..d41817360 100644 --- a/mriqc/cli/parser.py +++ b/mriqc/cli/parser.py @@ -490,6 +490,7 @@ def parse_args(args=None, namespace=None): from mriqc import __version__ from mriqc._warnings import DATE_FMT, LOGGER_FMT, _LogFormatter from mriqc.messages import PARTICIPANT_START + from mriqc.utils.misc import initialize_meta_and_data parser = _build_parser() opts = parser.parse_args(args, namespace) @@ -642,11 +643,7 @@ def parse_args(args=None, namespace=None): f'MRIQC is unable to process the following modalities: {", ".join(unknown_mods)}.' ) - # Estimate the biggest file size / leave 1GB if some file does not exist (datalad) - with suppress(FileNotFoundError): - config.workflow.biggest_file_gb = _get_biggest_file_size_gb( - config.workflow.inputs.values() - ) + initialize_meta_and_data() # set specifics for alternative populations if opts.species.lower() != 'human': @@ -660,17 +657,3 @@ def parse_args(args=None, namespace=None): config.workflow.fd_radius = 7.5 # block uploads for the moment; can be reversed before wider release config.execution.no_sub = True - - -def _get_biggest_file_size_gb(files): - """Identify the largest file size (allows multi-echo groups).""" - - import os - - sizes = [] - for file in files: - if isinstance(file, (list, tuple)): - sizes.append(_get_biggest_file_size_gb(file)) - else: - sizes.append(os.path.getsize(file)) - return max(sizes) / (1024**3) diff --git a/mriqc/utils/misc.py b/mriqc/utils/misc.py index 269b9b670..04b056ab6 100644 --- a/mriqc/utils/misc.py +++ b/mriqc/utils/misc.py @@ -22,11 +22,19 @@ # """Helper functions.""" +from __future__ import annotations + +import asyncio import json from collections import OrderedDict from collections.abc import Iterable +from functools import partial +from os import cpu_count from pathlib import Path +from typing import Callable, TypeVar +import nibabel as nb +import numpy as np import pandas as pd try: @@ -34,6 +42,8 @@ except ImportError: from collections.abc import MutableMapping +R = TypeVar('R') + IMTYPES = { 'T1w': 'anat', 'T2w': 'anat', @@ -58,6 +68,11 @@ (_rec-(?P[a-zA-Z0-9]+))?(_run-(?P[a-zA-Z0-9]+))?\ """ +async def worker(job: Callable[[], R], semaphore) -> R: + async with semaphore: + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, job) + def reorder_csv(csv_file, out_file=None): """ @@ -168,7 +183,7 @@ def generate_pred(derivatives_dir, output_dir, mod): # Drop duplicates dataframe.drop_duplicates(bdits_cols, keep='last', inplace=True) - out_csv = Path(output_dir) / ('%s_predicted_qa_csv' % mod) + out_csv = Path(output_dir) / f'{mod}_predicted_qa_csv' dataframe[bdits_cols + ['mriqc_pred']].to_csv(str(out_csv), index=False) return out_csv @@ -179,7 +194,7 @@ def generate_tsv(output_dir, mod): """ # If some were found, generate the CSV file and group report - out_tsv = output_dir / ('group_%s.tsv' % mod) + out_tsv = output_dir / (f'group_{mod}.tsv') jsonfiles = list(output_dir.glob(f'sub-*/**/{IMTYPES[mod]}/sub-*_{mod}.json')) if not jsonfiles: return None, out_tsv @@ -249,7 +264,10 @@ def _flatten_list(xs): def _datalad_get(input_list, nprocs=None): from mriqc import config - if not config.execution.bids_dir_datalad: + if ( + not config.execution.bids_dir_datalad + or not config.execution.datalad_get + ): return # Delay datalad import until we're sure we'll need it @@ -273,3 +291,220 @@ def _datalad_get(input_list, nprocs=None): config.nipype.nprocs, ), ) + + +def _file_meta_and_size( + files: list | str, + volmin: int | None = 1, + volmax: int | None = None, +): + """ + Identify the largest file size (allows multi-echo groups). + + Parameters + ---------- + files : :obj:`list` + List of :obj:`os.pathlike` or sublist of :obj:`os.pathlike` (multi-echo case) + of files to be extracted. + volmin : :obj:`int` + Minimum number of volumes that inputs must have. + volmax : :obj:`int` + Maximum number of volumes that inputs must have. + + Returns + ------- + :obj:`tuple` + A tuple (metadata, entities, sizes, valid) of items containing the different + aspects extracted from the input(s). + + """ + + import os + + from mriqc import config + + multifile = isinstance(files, (list, tuple)) + if multifile: + metadata = [] + entities = [] + _size_list = [] + _valid_list = [] + + for filename in files: + metadata_i, entities_i, sizes_i, valid_i = _file_meta_and_size( + filename, + volmin=volmin, + volmax=volmax, + ) + + # Add to output lists + metadata.append(metadata_i) + entities.append(entities_i) + _size_list.append(sizes_i) + _valid_list.append(valid_i) + + valid = ( + all(_valid_list) + and len({_m['NumberOfVolumes'] for _m in metadata}) == 1 + ) + return metadata, entities, np.sum(_size_list), valid + + metadata = config.execution.layout.get_metadata(files) + entities = config.execution.layout.parse_file_entities(files) + size = os.path.getsize(files) / (1024**3) + + metadata['FileSize'] = size + metadata['FileSizeUnits'] = 'GB' + + try: + nii = nb.load(files) + nifti_len = nii.shape[3] + except nb.filebasedimages.ImageFileError: + nifti_len = None + except IndexError: # shape has only 3 elements + nifti_len = 1 if nii.dataobj.ndim == 3 else -1 + + valid = True + if volmin is not None: + valid = nifti_len >= volmin + + if valid and volmax is not None: + valid = nifti_len <= volmax + + metadata['NumberOfVolumes'] = nifti_len + + return metadata, entities, size, valid + + +async def _extract_meta_and_size( + filelist: list, + volmin: int | None = 1, + volmax: int | None = None, + max_concurrent: int = min(cpu_count(), 12), +) -> tuple[list, list, list, list]: + """ + Extract corresponding metadata and file size in GB. + + Parameters + ---------- + filelist : :obj:`list` + List of :obj:`os.pathlike` or sublist of :obj:`os.pathlike` (multi-echo case) + of files to be extracted. + volmin : :obj:`int` + Minimum number of volumes that inputs must have. + volmax : :obj:`int` + Maximum number of volumes that inputs must have. + max_concurrent : :obj:`int` + Maximum number of concurrent coroutines (files or multi-echo sets). + + Returns + ------- + :obj:`tuple` + A tuple (metadata, entities, sizes, valid) of lists containing the different + aspects extracted from inputs. + + """ + + semaphore = asyncio.Semaphore(max_concurrent) + tasks = [] + for filename in filelist: + tasks.append( + asyncio.create_task( + worker( + partial( + _file_meta_and_size, + filename, + volmin=volmin, + volmax=volmax, + ), + semaphore, + ) + ) + ) + + # Gather guarantees the order of the output + metadata, entities, sizes, valid = list(zip(*await asyncio.gather(*tasks))) + return metadata, entities, sizes, valid + + +def initialize_meta_and_data( + max_concurrent: int = min(cpu_count(), 12), +) -> None: + """ + Mine data and metadata corresponding to the dataset. + + Get files if datalad enabled and extract the necessary metadata. + + Parameters + ---------- + max_concurrent : :obj:`int` + Maximum number of concurrent coroutines (files or multi-echo sets). + + Returns + ------- + :obj:`None` + + """ + from mriqc import config + + # Datalad-get all files + dataset = config.workflow.inputs.values() + _datalad_get(dataset) + + # Extract metadata and filesize + config.workflow.input_metadata = {} + config.workflow.input_entities = {} + config.workflow.biggest_file_gb = {} + for mod, input_list in config.workflow.inputs.items(): + config.loggers.cli.log( + 25, + f"Extracting metadata and entities for {len(input_list)} input runs " + f"of modality '{mod}'...", + ) + + # Some modalities require a minimum number of volumes + volmin = None + if mod == 'bold': + volmin = config.workflow.min_len_bold + elif mod == 'dwi': + volmin = config.workflow.min_len_dwi + + # Some modalities require a maximum number of volumes + volmax = None + if mod in ('T1w', 'T2w'): + volmax = 1 + + # Run extraction in a asyncio coroutine loop + metadata, entities, size, valid = asyncio.run( + _extract_meta_and_size( + input_list, + max_concurrent=max_concurrent, + volmin=volmin, + volmax=volmax, + ) + ) + + # Identify nonconformant files that need to be dropped (and drop them) + if (num_dropped := len(input_list) - np.sum(valid)): + config.loggers.workflow.warn( + f'{num_dropped} cannot be processed (too short or too long)' + ) + + filtered_results = [ + _v[:-1] for _v in zip(input_list, metadata, entities, size, valid) + if _v[-1] is True + ] + input_list, metadata, entities, size = list(zip(*filtered_results)) + config.workflow.inputs[mod] = input_list + + # Finalizing (write to config so that values are propagated) + _max_size = np.max(size) + config.workflow.input_metadata[mod] = metadata + config.workflow.input_entities[mod] = entities + config.workflow.biggest_file_gb[mod] = float(_max_size) # Cast required to store YAML + + config.loggers.cli.log( + 25, + f"File size ('{mod}'): {_max_size:.2f}|{np.mean(size):.2f} " + "GB [maximum|average].", + ) diff --git a/mriqc/workflows/functional/base.py b/mriqc/workflows/functional/base.py index ec26cda9f..30f2c4086 100644 --- a/mriqc/workflows/functional/base.py +++ b/mriqc/workflows/functional/base.py @@ -78,53 +78,24 @@ def fmri_qc_workflow(name='funcMRIQC'): workflow = pe.Workflow(name=name) - mem_gb = config.workflow.biggest_file_gb + mem_gb = config.workflow.biggest_file_gb["bold"] dataset = config.workflow.inputs.get('bold', []) - if config.execution.datalad_get: - from mriqc.utils.misc import _datalad_get - - _datalad_get(dataset) - - full_files = [] - for bold_path in dataset: - try: - bold_len = nb.load( - bold_path[0] - if isinstance(bold_path, Iterable) and not isinstance(bold_path, (str, bytes)) - else bold_path - ).shape[3] - except nb.filebasedimages.ImageFileError: - bold_len = config.workflow.min_len_bold - except IndexError: # shape has only 3 elements - bold_len = 0 - if bold_len >= config.workflow.min_len_bold: - full_files.append(bold_path) - else: - config.loggers.workflow.warn( - f'Dismissing {bold_path} for processing: insufficient number of ' - f'timepoints ({bold_len}) to execute the workflow.' - ) - message = BUILDING_WORKFLOW.format( modality='functional', detail=( - f'for {len(full_files)} BOLD runs.' - if len(full_files) > 2 + f'for {len(dataset)} BOLD runs.' + if len(dataset) > 2 else f"({' and '.join('<%s>' % v for v in dataset)})." ), ) config.loggers.workflow.info(message) - if set(flatten(dataset)) - set(flatten(full_files)): - config.workflow.inputs['bold'] = full_files - config.to_filename() - # Define workflow, inputs and outputs # 0. Get data, put it in RAS orientation inputnode = pe.Node(niu.IdentityInterface(fields=['in_file']), name='inputnode') - inputnode.iterables = [('in_file', full_files)] + inputnode.iterables = [('in_file', dataset)] outputnode = pe.Node( niu.IdentityInterface(fields=['qc', 'mosaic', 'out_group', 'out_dvars', 'out_fd']), @@ -321,7 +292,7 @@ def compute_iqms(name='ComputeIQMs'): from mriqc.interfaces.transitional import GCOR from mriqc.workflows.utils import _tofloat, get_fwhmx - mem_gb = config.workflow.biggest_file_gb + mem_gb = config.workflow.biggest_file_gb["bold"] workflow = pe.Workflow(name=name) inputnode = pe.Node( @@ -557,7 +528,7 @@ def hmc(name='fMRI_HMC', omp_nthreads=None): from nipype.algorithms.confounds import FramewiseDisplacement from nipype.interfaces.afni import Despike, Refit, Volreg - mem_gb = config.workflow.biggest_file_gb + mem_gb = config.workflow.biggest_file_gb["bold"] workflow = pe.Workflow(name=name) diff --git a/mriqc/workflows/functional/output.py b/mriqc/workflows/functional/output.py index 7ee734e30..ccb44e711 100644 --- a/mriqc/workflows/functional/output.py +++ b/mriqc/workflows/functional/output.py @@ -49,7 +49,7 @@ def init_func_report_wf(name='func_report_wf'): # from mriqc.interfaces.reports import IndividualReport verbose = config.execution.verbose_reports - mem_gb = config.workflow.biggest_file_gb + mem_gb = config.workflow.biggest_file_gb["bold"] reportlets_dir = config.execution.work_dir / 'reportlets' workflow = pe.Workflow(name=name)