Skip to content

Commit

Permalink
enh: write a parallelized routine to extract file metadata and test f…
Browse files Browse the repository at this point in the history
…ile length
  • Loading branch information
oesteban committed Aug 12, 2024
1 parent a6a2ba8 commit ef40015
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 58 deletions.
21 changes: 2 additions & 19 deletions mriqc/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ def parse_args(args=None, namespace=None):
from mriqc import __version__
from mriqc._warnings import DATE_FMT, LOGGER_FMT, _LogFormatter
from mriqc.messages import PARTICIPANT_START
from mriqc.utils.misc import initialize_meta_and_data

parser = _build_parser()
opts = parser.parse_args(args, namespace)
Expand Down Expand Up @@ -642,11 +643,7 @@ def parse_args(args=None, namespace=None):
f'MRIQC is unable to process the following modalities: {", ".join(unknown_mods)}.'
)

# Estimate the biggest file size / leave 1GB if some file does not exist (datalad)
with suppress(FileNotFoundError):
config.workflow.biggest_file_gb = _get_biggest_file_size_gb(
config.workflow.inputs.values()
)
initialize_meta_and_data()

# set specifics for alternative populations
if opts.species.lower() != 'human':
Expand All @@ -660,17 +657,3 @@ def parse_args(args=None, namespace=None):
config.workflow.fd_radius = 7.5
# block uploads for the moment; can be reversed before wider release
config.execution.no_sub = True


def _get_biggest_file_size_gb(files):
"""Identify the largest file size (allows multi-echo groups)."""

import os

sizes = []
for file in files:
if isinstance(file, (list, tuple)):
sizes.append(_get_biggest_file_size_gb(file))
else:
sizes.append(os.path.getsize(file))
return max(sizes) / (1024**3)
241 changes: 238 additions & 3 deletions mriqc/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,28 @@
#
"""Helper functions."""

from __future__ import annotations

import asyncio
import json
from collections import OrderedDict
from collections.abc import Iterable
from functools import partial
from os import cpu_count
from pathlib import Path
from typing import Callable, TypeVar

import nibabel as nb
import numpy as np
import pandas as pd

try:
from collections.abc import MutableMapping
except ImportError:
from collections.abc import MutableMapping

R = TypeVar('R')

IMTYPES = {
'T1w': 'anat',
'T2w': 'anat',
Expand All @@ -58,6 +68,11 @@
(_rec-(?P<rec_id>[a-zA-Z0-9]+))?(_run-(?P<run_id>[a-zA-Z0-9]+))?\
"""

async def worker(job: Callable[[], R], semaphore) -> R:
async with semaphore:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, job)


def reorder_csv(csv_file, out_file=None):
"""
Expand Down Expand Up @@ -168,7 +183,7 @@ def generate_pred(derivatives_dir, output_dir, mod):
# Drop duplicates
dataframe.drop_duplicates(bdits_cols, keep='last', inplace=True)

out_csv = Path(output_dir) / ('%s_predicted_qa_csv' % mod)
out_csv = Path(output_dir) / f'{mod}_predicted_qa_csv'
dataframe[bdits_cols + ['mriqc_pred']].to_csv(str(out_csv), index=False)
return out_csv

Expand All @@ -179,7 +194,7 @@ def generate_tsv(output_dir, mod):
"""

# If some were found, generate the CSV file and group report
out_tsv = output_dir / ('group_%s.tsv' % mod)
out_tsv = output_dir / (f'group_{mod}.tsv')
jsonfiles = list(output_dir.glob(f'sub-*/**/{IMTYPES[mod]}/sub-*_{mod}.json'))
if not jsonfiles:
return None, out_tsv
Expand Down Expand Up @@ -249,7 +264,10 @@ def _flatten_list(xs):
def _datalad_get(input_list, nprocs=None):
from mriqc import config

if not config.execution.bids_dir_datalad:
if (
not config.execution.bids_dir_datalad
or not config.execution.datalad_get
):
return

# Delay datalad import until we're sure we'll need it
Expand All @@ -273,3 +291,220 @@ def _datalad_get(input_list, nprocs=None):
config.nipype.nprocs,
),
)


def _file_meta_and_size(
files: list | str,
volmin: int | None = 1,
volmax: int | None = None,
):
"""
Identify the largest file size (allows multi-echo groups).
Parameters
----------
files : :obj:`list`
List of :obj:`os.pathlike` or sublist of :obj:`os.pathlike` (multi-echo case)
of files to be extracted.
volmin : :obj:`int`
Minimum number of volumes that inputs must have.
volmax : :obj:`int`
Maximum number of volumes that inputs must have.
Returns
-------
:obj:`tuple`
A tuple (metadata, entities, sizes, valid) of items containing the different
aspects extracted from the input(s).
"""

import os

from mriqc import config

multifile = isinstance(files, (list, tuple))
if multifile:
metadata = []
entities = []
_size_list = []
_valid_list = []

for filename in files:
metadata_i, entities_i, sizes_i, valid_i = _file_meta_and_size(
filename,
volmin=volmin,
volmax=volmax,
)

# Add to output lists
metadata.append(metadata_i)
entities.append(entities_i)
_size_list.append(sizes_i)
_valid_list.append(valid_i)

valid = (
all(_valid_list)
and len({_m['NumberOfVolumes'] for _m in metadata}) == 1
)
return metadata, entities, np.sum(_size_list), valid

metadata = config.execution.layout.get_metadata(files)
entities = config.execution.layout.parse_file_entities(files)
size = os.path.getsize(files) / (1024**3)

metadata['FileSize'] = size
metadata['FileSizeUnits'] = 'GB'

try:
nii = nb.load(files)
nifti_len = nii.shape[3]
except nb.filebasedimages.ImageFileError:
nifti_len = None
except IndexError: # shape has only 3 elements
nifti_len = 1 if nii.dataobj.ndim == 3 else -1

valid = True
if volmin is not None:
valid = nifti_len >= volmin

if valid and volmax is not None:
valid = nifti_len <= volmax

metadata['NumberOfVolumes'] = nifti_len

return metadata, entities, size, valid


async def _extract_meta_and_size(
filelist: list,
volmin: int | None = 1,
volmax: int | None = None,
max_concurrent: int = min(cpu_count(), 12),
) -> tuple[list, list, list, list]:
"""
Extract corresponding metadata and file size in GB.
Parameters
----------
filelist : :obj:`list`
List of :obj:`os.pathlike` or sublist of :obj:`os.pathlike` (multi-echo case)
of files to be extracted.
volmin : :obj:`int`
Minimum number of volumes that inputs must have.
volmax : :obj:`int`
Maximum number of volumes that inputs must have.
max_concurrent : :obj:`int`
Maximum number of concurrent coroutines (files or multi-echo sets).
Returns
-------
:obj:`tuple`
A tuple (metadata, entities, sizes, valid) of lists containing the different
aspects extracted from inputs.
"""

semaphore = asyncio.Semaphore(max_concurrent)
tasks = []
for filename in filelist:
tasks.append(
asyncio.create_task(
worker(
partial(
_file_meta_and_size,
filename,
volmin=volmin,
volmax=volmax,
),
semaphore,
)
)
)

# Gather guarantees the order of the output
metadata, entities, sizes, valid = list(zip(*await asyncio.gather(*tasks)))
return metadata, entities, sizes, valid


def initialize_meta_and_data(
max_concurrent: int = min(cpu_count(), 12),
) -> None:
"""
Mine data and metadata corresponding to the dataset.
Get files if datalad enabled and extract the necessary metadata.
Parameters
----------
max_concurrent : :obj:`int`
Maximum number of concurrent coroutines (files or multi-echo sets).
Returns
-------
:obj:`None`
"""
from mriqc import config

# Datalad-get all files
dataset = config.workflow.inputs.values()
_datalad_get(dataset)

# Extract metadata and filesize
config.workflow.input_metadata = {}
config.workflow.input_entities = {}
config.workflow.biggest_file_gb = {}
for mod, input_list in config.workflow.inputs.items():
config.loggers.cli.log(
25,
f"Extracting metadata and entities for {len(input_list)} input runs "
f"of modality '{mod}'...",
)

# Some modalities require a minimum number of volumes
volmin = None
if mod == 'bold':
volmin = config.workflow.min_len_bold
elif mod == 'dwi':
volmin = config.workflow.min_len_dwi

# Some modalities require a maximum number of volumes
volmax = None
if mod in ('T1w', 'T2w'):
volmax = 1

# Run extraction in a asyncio coroutine loop
metadata, entities, size, valid = asyncio.run(
_extract_meta_and_size(
input_list,
max_concurrent=max_concurrent,
volmin=volmin,
volmax=volmax,
)
)

# Identify nonconformant files that need to be dropped (and drop them)
if (num_dropped := len(input_list) - np.sum(valid)):
config.loggers.workflow.warn(
f'{num_dropped} cannot be processed (too short or too long)'
)

filtered_results = [
_v[:-1] for _v in zip(input_list, metadata, entities, size, valid)
if _v[-1] is True
]
input_list, metadata, entities, size = list(zip(*filtered_results))
config.workflow.inputs[mod] = input_list

# Finalizing (write to config so that values are propagated)
_max_size = np.max(size)
config.workflow.input_metadata[mod] = metadata
config.workflow.input_entities[mod] = entities
config.workflow.biggest_file_gb[mod] = float(_max_size) # Cast required to store YAML

config.loggers.cli.log(
25,
f"File size ('{mod}'): {_max_size:.2f}|{np.mean(size):.2f} "
"GB [maximum|average].",
)
Loading

0 comments on commit ef40015

Please sign in to comment.