From 8f4ec34f4637e377126b4883ae3da1999fd8fddf Mon Sep 17 00:00:00 2001 From: Chris Markiewicz Date: Thu, 22 Aug 2024 15:50:06 -0400 Subject: [PATCH] RF: Switch from DerivativesDataSinks to Prep/Save nodes --- fmriprep/interfaces/__init__.py | 10 - fmriprep/workflows/base.py | 44 ++- fmriprep/workflows/bold/base.py | 77 +++-- fmriprep/workflows/bold/confounds.py | 73 +++-- fmriprep/workflows/bold/outputs.py | 387 +++++++++++++++++--------- fmriprep/workflows/bold/resampling.py | 31 ++- 6 files changed, 418 insertions(+), 204 deletions(-) diff --git a/fmriprep/interfaces/__init__.py b/fmriprep/interfaces/__init__.py index 9fe92fce9..e69de29bb 100644 --- a/fmriprep/interfaces/__init__.py +++ b/fmriprep/interfaces/__init__.py @@ -1,10 +0,0 @@ -# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- -# vi: set ft=python sts=4 ts=4 sw=4 et: -from niworkflows.interfaces.bids import DerivativesDataSink as _DDSink - - -class DerivativesDataSink(_DDSink): - out_path_base = '' - - -__all__ = ('DerivativesDataSink',) diff --git a/fmriprep/workflows/base.py b/fmriprep/workflows/base.py index 01664861a..1f3ee5eed 100644 --- a/fmriprep/workflows/base.py +++ b/fmriprep/workflows/base.py @@ -37,11 +37,11 @@ import bids from nipype.interfaces import utility as niu from nipype.pipeline import engine as pe +from niworkflows.interfaces.bids import PrepareDerivative, SaveDerivative from niworkflows.utils.connections import listify from packaging.version import Version from .. import config -from ..interfaces import DerivativesDataSink from ..interfaces.reports import AboutSummary, SubjectSummary from ..utils.bids import dismiss_echo @@ -287,24 +287,34 @@ def init_single_subject_wf(subject_id: str): run_without_submitting=True, ) - ds_report_summary = pe.Node( - DerivativesDataSink( - base_directory=config.execution.fmriprep_dir, + prep_report_summary = pe.Node( + PrepareDerivative( desc='summary', datatype='figures', dismiss_entities=dismiss_echo(), ), + name='prep_report_summary', + run_without_submitting=True, + ) + + ds_report_summary = pe.Node( + SaveDerivative(base_directory=config.execution.fmriprep_dir), name='ds_report_summary', run_without_submitting=True, ) - ds_report_about = pe.Node( - DerivativesDataSink( - base_directory=config.execution.fmriprep_dir, + prep_report_about = pe.Node( + PrepareDerivative( desc='about', datatype='figures', dismiss_entities=dismiss_echo(), ), + name='prep_report_about', + run_without_submitting=True, + ) + + ds_report_about = pe.Node( + SaveDerivative(base_directory=config.execution.fmriprep_dir), name='ds_report_about', run_without_submitting=True, ) @@ -339,15 +349,15 @@ def init_single_subject_wf(subject_id: str): workflow.connect([ (bidssrc, bids_info, [(('bold', fix_multi_T1w_source_name), 'in_file')]), (anat_fit_wf, summary, [('outputnode.t1w_preproc', 't1w')]), - (anat_fit_wf, ds_report_summary, [('outputnode.t1w_preproc', 'source_file')]), - (anat_fit_wf, ds_report_about, [('outputnode.t1w_preproc', 'source_file')]), + (anat_fit_wf, prep_report_summary, [('outputnode.t1w_preproc', 'source_file')]), + (anat_fit_wf, prep_report_about, [('outputnode.t1w_preproc', 'source_file')]), ]) # fmt:skip else: workflow.connect([ (bidssrc, bids_info, [(('t1w', fix_multi_T1w_source_name), 'in_file')]), (bidssrc, summary, [('t1w', 't1w')]), - (bidssrc, ds_report_summary, [(('t1w', fix_multi_T1w_source_name), 'source_file')]), - (bidssrc, ds_report_about, [(('t1w', fix_multi_T1w_source_name), 'source_file')]), + (bidssrc, prep_report_summary, [(('t1w', fix_multi_T1w_source_name), 'source_file')]), + (bidssrc, prep_report_about, [(('t1w', fix_multi_T1w_source_name), 'source_file')]), ]) # fmt:skip workflow.connect([ @@ -363,8 +373,16 @@ def init_single_subject_wf(subject_id: str): (inputnode, summary, [('subjects_dir', 'subjects_dir')]), (bidssrc, summary, [('t2w', 't2w'), ('bold', 'bold')]), (bids_info, summary, [('subject', 'subject_id')]), - (summary, ds_report_summary, [('out_report', 'in_file')]), - (about, ds_report_about, [('out_report', 'in_file')]), + (summary, prep_report_summary, [('out_report', 'in_file')]), + (about, prep_report_about, [('out_report', 'in_file')]), + (prep_report_summary, ds_report_summary, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), + (prep_report_about, ds_report_about, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), ]) # fmt:skip # Set up the template iterator once, if used diff --git a/fmriprep/workflows/bold/base.py b/fmriprep/workflows/bold/base.py index e37692f0e..69feadfa7 100644 --- a/fmriprep/workflows/bold/base.py +++ b/fmriprep/workflows/bold/base.py @@ -32,10 +32,10 @@ from nipype.interfaces import utility as niu from nipype.pipeline import engine as pe +from niworkflows.interfaces.bids import PrepareDerivative, SaveDerivative from niworkflows.utils.connections import listify from ... import config -from ...interfaces import DerivativesDataSink from ...utils.bids import dismiss_echo from ...utils.misc import estimate_bold_mem_usage @@ -336,22 +336,34 @@ def init_bold_wf( if multiecho: t2s_reporting_wf = init_t2s_reporting_wf() - ds_report_t2scomp = pe.Node( - DerivativesDataSink( + prep_report_t2scomp = pe.Node( + PrepareDerivative( desc='t2scomp', datatype='figures', dismiss_entities=dismiss_echo(), ), + name='prep_report_t2scomp', + run_without_submitting=True, + ) + + ds_report_t2scomp = pe.Node( + SaveDerivative(), name='ds_report_t2scomp', run_without_submitting=True, ) - ds_report_t2star_hist = pe.Node( - DerivativesDataSink( + prep_report_t2star_hist = pe.Node( + PrepareDerivative( desc='t2starhist', datatype='figures', dismiss_entities=dismiss_echo(), ), + name='prep_report_t2star_hist', + run_without_submitting=True, + ) + + ds_report_t2star_hist = pe.Node( + SaveDerivative(), name='ds_report_t2star_hist', run_without_submitting=True, ) @@ -365,8 +377,16 @@ def init_bold_wf( (bold_native_wf, t2s_reporting_wf, [ ('outputnode.t2star_map', 'inputnode.t2star_file'), ]), - (t2s_reporting_wf, ds_report_t2scomp, [('outputnode.t2s_comp_report', 'in_file')]), - (t2s_reporting_wf, ds_report_t2star_hist, [('outputnode.t2star_hist', 'in_file')]), + (t2s_reporting_wf, prep_report_t2scomp, [('outputnode.t2s_comp_report', 'in_file')]), + (t2s_reporting_wf, prep_report_t2star_hist, [('outputnode.t2star_hist', 'in_file')]), + (prep_report_t2scomp, ds_report_t2scomp, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), + (prep_report_t2star_hist, ds_report_t2star_hist, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), ]) # fmt:skip if config.workflow.level == 'resampling': @@ -374,6 +394,7 @@ def init_bold_wf( for node in workflow.list_node_names(): if node.split('.')[-1].startswith('ds_report'): workflow.get_node(node).inputs.base_directory = fmriprep_dir + if node.split('.')[-1].startswith('prep_report'): workflow.get_node(node).inputs.source_file = bold_file return workflow @@ -584,9 +605,8 @@ def init_bold_wf( repetition_time=all_metadata[0]['RepetitionTime'], ) - ds_bold_cifti = pe.Node( - DerivativesDataSink( - base_directory=fmriprep_dir, + prep_bold_cifti = pe.Node( + PrepareDerivative( dismiss_entities=dismiss_echo(), space='fsLR', density=config.workflow.cifti_output, @@ -595,10 +615,15 @@ def init_bold_wf( TaskName=all_metadata[0].get('TaskName'), **prepare_timing_parameters(all_metadata[0]), ), + name='prep_bold_cifti', + ) + prep_bold_cifti.inputs.source_file = bold_file + + ds_bold_cifti = pe.Node( + SaveDerivative(base_directory=fmriprep_dir), name='ds_bold_cifti', run_without_submitting=True, ) - ds_bold_cifti.inputs.source_file = bold_file workflow.connect([ # Resample BOLD to MNI152NLin6Asym, may duplicate bold_std_wf above @@ -637,9 +662,14 @@ def init_bold_wf( (bold_fsLR_resampling_wf, bold_grayords_wf, [ ('outputnode.bold_fsLR', 'inputnode.bold_fsLR'), ]), - (bold_grayords_wf, ds_bold_cifti, [ + (bold_grayords_wf, prep_bold_cifti, [ ('outputnode.cifti_bold', 'in_file'), - (('outputnode.cifti_metadata', _read_json), 'meta_dict'), + ('outputnode.cifti_metadata', 'meta_dict'), + ]), + (prep_bold_cifti, ds_bold_cifti, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), ]), ]) # fmt:skip @@ -653,18 +683,21 @@ def init_bold_wf( name='bold_confounds_wf', ) - ds_confounds = pe.Node( - DerivativesDataSink( - base_directory=fmriprep_dir, + prepare_confounds = pe.Node( + PrepareDerivative( desc='confounds', suffix='timeseries', dismiss_entities=dismiss_echo(), ), + name='prepare_confounds', + run_without_submitting=True, + ) + ds_confounds = pe.Node( + SaveDerivative(base_directory=fmriprep_dir), name='ds_confounds', run_without_submitting=True, - mem_gb=config.DEFAULT_MEMORY_MIN_GB, ) - ds_confounds.inputs.source_file = bold_file + prepare_confounds.inputs.source_file = bold_file workflow.connect([ (inputnode, bold_confounds_wf, [ @@ -681,10 +714,15 @@ def init_bold_wf( (bold_native_wf, bold_confounds_wf, [ ('outputnode.bold_native', 'inputnode.bold'), ]), - (bold_confounds_wf, ds_confounds, [ + (bold_confounds_wf, prepare_confounds, [ ('outputnode.confounds_file', 'in_file'), ('outputnode.confounds_metadata', 'meta_dict'), ]), + (prepare_confounds, ds_confounds, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), + ]), ]) # fmt:skip if spaces.get_spaces(nonstandard=False, dim=(3,)): @@ -726,6 +764,7 @@ def _last(inlist): for node in workflow.list_node_names(): if node.split('.')[-1].startswith('ds_report'): workflow.get_node(node).inputs.base_directory = fmriprep_dir + if node.split('.')[-1].startswith('prep_report'): workflow.get_node(node).inputs.source_file = bold_file return workflow diff --git a/fmriprep/workflows/bold/confounds.py b/fmriprep/workflows/bold/confounds.py index 115f65604..7ecdfa26d 100644 --- a/fmriprep/workflows/bold/confounds.py +++ b/fmriprep/workflows/bold/confounds.py @@ -31,10 +31,9 @@ from nipype.algorithms import confounds as nac from nipype.interfaces import utility as niu from nipype.pipeline import engine as pe +from niworkflows.interfaces.bids import PrepareDerivative, SaveDerivative from templateflow.api import get as get_template -from ...config import DEFAULT_MEMORY_MIN_GB -from ...interfaces import DerivativesDataSink from ...interfaces.confounds import ( FilterDropped, FMRISummary, @@ -464,11 +463,16 @@ def init_bold_confs_wf( mem_gb=mem_gb, ) + prep_report_bold_rois = pe.Node( + PrepareDerivative(desc='rois', datatype='figures', dismiss_entities=dismiss_echo()), + name='prep_report_bold_rois', + run_without_submitting=True, + ) + ds_report_bold_rois = pe.Node( - DerivativesDataSink(desc='rois', datatype='figures', dismiss_entities=dismiss_echo()), + SaveDerivative(), name='ds_report_bold_rois', run_without_submitting=True, - mem_gb=DEFAULT_MEMORY_MIN_GB, ) # Generate reportlet (CompCor) @@ -483,13 +487,16 @@ def init_bold_confs_wf( name='compcor_plot', ) + prep_report_compcor = pe.Node( + PrepareDerivative(desc='compcorvar', datatype='figures', dismiss_entities=dismiss_echo()), + name='prep_report_compcor', + run_without_submitting=True, + ) + ds_report_compcor = pe.Node( - DerivativesDataSink( - desc='compcorvar', datatype='figures', dismiss_entities=dismiss_echo() - ), + SaveDerivative(), name='ds_report_compcor', run_without_submitting=True, - mem_gb=DEFAULT_MEMORY_MIN_GB, ) # Generate reportlet (Confound correlation) @@ -497,13 +504,18 @@ def init_bold_confs_wf( ConfoundsCorrelationPlot(reference_column='global_signal', max_dim=20), name='conf_corr_plot', ) - ds_report_conf_corr = pe.Node( - DerivativesDataSink( + prep_report_conf_corr = pe.Node( + PrepareDerivative( desc='confoundcorr', datatype='figures', dismiss_entities=dismiss_echo() ), + name='prep_report_conf_corr', + run_without_submitting=True, + ) + + ds_report_conf_corr = pe.Node( + SaveDerivative(), name='ds_report_conf_corr', run_without_submitting=True, - mem_gb=DEFAULT_MEMORY_MIN_GB, ) def _last(inlist): @@ -518,7 +530,6 @@ def _select_cols(table): if not col.startswith(('a_comp_cor_', 't_comp_cor_', 'std_dvars')) ] - # fmt:off workflow.connect([ # connect inputnode to each non-anatomical confound node (inputnode, dvars, [('bold', 'in_file'), @@ -607,18 +618,29 @@ def _select_cols(table): (acc_msk_bin, mrg_compcor, [(('out_file', _last), 'in2')]), (subtract_mask, mrg_compcor, [('out_mask', 'in3')]), (mrg_compcor, rois_plot, [('out', 'in_rois')]), - (rois_plot, ds_report_bold_rois, [('out_report', 'in_file')]), + (rois_plot, prep_report_bold_rois, [('out_report', 'in_file')]), + (prep_report_bold_rois, ds_report_bold_rois, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), (tcompcor, mrg_cc_metadata, [('metadata_file', 'in1')]), (acompcor, mrg_cc_metadata, [('metadata_file', 'in2')]), (crowncompcor, mrg_cc_metadata, [('metadata_file', 'in3')]), (mrg_cc_metadata, compcor_plot, [('out', 'metadata_files')]), - (compcor_plot, ds_report_compcor, [('out_file', 'in_file')]), + (compcor_plot, prep_report_compcor, [('out_file', 'in_file')]), + (prep_report_compcor, ds_report_compcor, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), (inputnode, conf_corr_plot, [('skip_vols', 'ignore_initial_volumes')]), (concat, conf_corr_plot, [('confounds_file', 'confounds_file'), (('confounds_file', _select_cols), 'columns')]), - (conf_corr_plot, ds_report_conf_corr, [('out_file', 'in_file')]), - ]) - # fmt: on + (conf_corr_plot, prep_report_conf_corr, [('out_file', 'in_file')]), + (prep_report_conf_corr, ds_report_conf_corr, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), + ]) # fmt:skip return workflow @@ -709,13 +731,18 @@ def init_carpetplot_wf( name='conf_plot', mem_gb=mem_gb, ) - ds_report_bold_conf = pe.Node( - DerivativesDataSink( + prep_report_bold_conf = pe.Node( + PrepareDerivative( desc='carpetplot', datatype='figures', extension='svg', dismiss_entities=dismiss_echo() ), + name='prep_report_bold_conf', + run_without_submitting=True, + ) + + ds_report_bold_conf = pe.Node( + SaveDerivative(), name='ds_report_bold_conf', run_without_submitting=True, - mem_gb=DEFAULT_MEMORY_MIN_GB, ) parcels = pe.Node(niu.Function(function=_carpet_parcellation), name='parcels') @@ -763,7 +790,11 @@ def init_carpetplot_wf( (mrg_xfms, resample_parc, [('out', 'transforms')]), (resample_parc, parcels, [('output_image', 'segmentation')]), (parcels, conf_plot, [('out', 'in_segm')]), - (conf_plot, ds_report_bold_conf, [('out_file', 'in_file')]), + (conf_plot, prep_report_bold_conf, [('out_file', 'in_file')]), + (prep_report_bold_conf, ds_report_bold_conf, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), (conf_plot, outputnode, [('out_file', 'out_carpetplot')]), ]) # fmt:skip return workflow diff --git a/fmriprep/workflows/bold/outputs.py b/fmriprep/workflows/bold/outputs.py index 70d1a4acf..b73a6b5b7 100644 --- a/fmriprep/workflows/bold/outputs.py +++ b/fmriprep/workflows/bold/outputs.py @@ -27,12 +27,12 @@ import numpy as np from nipype.interfaces import utility as niu from nipype.pipeline import engine as pe +from niworkflows.interfaces.bids import PrepareDerivative, SaveDerivative from niworkflows.interfaces.fixes import FixHeaderApplyTransforms as ApplyTransforms from niworkflows.utils.images import dseg_label from fmriprep import config from fmriprep.config import DEFAULT_MEMORY_MIN_GB -from fmriprep.interfaces import DerivativesDataSink from fmriprep.interfaces.bids import BIDSURI from fmriprep.utils.bids import dismiss_echo @@ -212,25 +212,27 @@ def init_func_fit_reports_wf( ] inputnode = pe.Node(niu.IdentityInterface(fields=inputfields), name='inputnode') - ds_summary = pe.Node( - DerivativesDataSink( - base_directory=output_dir, - desc='summary', - datatype='figures', - dismiss_entities=dismiss_echo(), - ), + prep_report_summary = pe.Node( + PrepareDerivative(desc='summary', datatype='figures', dismiss_entities=dismiss_echo()), + name='prep_report_summary', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + ds_report_summary = pe.Node( + SaveDerivative(base_directory=output_dir), name='ds_report_summary', run_without_submitting=True, mem_gb=config.DEFAULT_MEMORY_MIN_GB, ) - ds_validation = pe.Node( - DerivativesDataSink( - base_directory=output_dir, - desc='validation', - datatype='figures', - dismiss_entities=dismiss_echo(), - ), + prep_report_validation = pe.Node( + PrepareDerivative(desc='validation', datatype='figures', dismiss_entities=dismiss_echo()), + name='prep_report_validation', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + ds_report_validation = pe.Node( + SaveDerivative(base_directory=output_dir), name='ds_report_validation', run_without_submitting=True, mem_gb=config.DEFAULT_MEMORY_MIN_GB, @@ -267,16 +269,23 @@ def init_func_fit_reports_wf( mem_gb=1, ) - # fmt:off workflow.connect([ - (inputnode, ds_summary, [ + (inputnode, prep_report_summary, [ ('source_file', 'source_file'), ('summary_report', 'in_file'), ]), - (inputnode, ds_validation, [ + (prep_report_summary, ds_report_summary, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), + (inputnode, prep_report_validation, [ ('source_file', 'source_file'), ('validation_report', 'in_file'), ]), + (prep_report_validation, ds_report_validation, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), (inputnode, t1w_boldref, [ ('t1w_preproc', 'input_image'), ('coreg_boldref', 'reference_image'), @@ -288,8 +297,7 @@ def init_func_fit_reports_wf( ('boldref2anat_xfm', 'transforms'), ]), (t1w_wm, boldref_wm, [('out', 'input_image')]), - ]) - # fmt:on + ]) # fmt:skip # Reportlets follow the structure of init_bold_fit_wf stages # - SDC1: @@ -327,15 +335,22 @@ def init_func_fit_reports_wf( mem_gb=0.1, ) - ds_sdcreg_report = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + prep_sdcreg_report = pe.Node( + PrepareDerivative( desc='fmapCoreg', suffix='bold', datatype='figures', dismiss_entities=dismiss_echo(), ), + name='prep_sdcreg_report', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + ds_sdcreg_report = pe.Node( + SaveDerivative(base_directory=output_dir), name='ds_sdcreg_report', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, ) # SDC2 @@ -349,18 +364,24 @@ def init_func_fit_reports_wf( mem_gb=0.1, ) - ds_sdc_report = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + prep_sdc_report = pe.Node( + PrepareDerivative( desc='sdc', suffix='bold', datatype='figures', dismiss_entities=dismiss_echo(), ), + name='prep_sdc_report', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + ds_sdc_report = pe.Node( + SaveDerivative(base_directory=output_dir), name='ds_sdc_report', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, ) - # fmt:off workflow.connect([ (inputnode, fmapref_boldref, [ ('fmap_ref', 'input_image'), @@ -373,17 +394,24 @@ def init_func_fit_reports_wf( ('bold_mask', 'mask'), ]), (fmapref_boldref, sdcreg_report, [('output_image', 'moving')]), - (inputnode, ds_sdcreg_report, [('source_file', 'source_file')]), - (sdcreg_report, ds_sdcreg_report, [('out_report', 'in_file')]), + (inputnode, prep_sdcreg_report, [('source_file', 'source_file')]), + (sdcreg_report, prep_sdcreg_report, [('out_report', 'in_file')]), + (prep_sdcreg_report, ds_sdcreg_report, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), (inputnode, sdc_report, [ ('sdc_boldref', 'before'), ('coreg_boldref', 'after'), ]), (boldref_wm, sdc_report, [('output_image', 'wm_seg')]), - (inputnode, ds_sdc_report, [('source_file', 'source_file')]), - (sdc_report, ds_sdc_report, [('out_report', 'in_file')]), - ]) - # fmt:on + (inputnode, prep_sdc_report, [('source_file', 'source_file')]), + (sdc_report, prep_sdc_report, [('out_report', 'in_file')]), + (prep_sdc_report, ds_sdc_report, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), + ]) # fmt:skip # EPI-T1 registration # Resample T1w image onto EPI-space @@ -398,26 +426,36 @@ def init_func_fit_reports_wf( mem_gb=0.1, ) - ds_epi_t1_report = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + prep_epi_t1_report = pe.Node( + PrepareDerivative( desc='coreg', suffix='bold', datatype='figures', dismiss_entities=dismiss_echo(), ), + name='prep_epi_t1_report', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + + ds_epi_t1_report = pe.Node( + SaveDerivative(base_directory=output_dir), name='ds_epi_t1_report', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, ) - # fmt:off workflow.connect([ (inputnode, epi_t1_report, [('coreg_boldref', 'after')]), (t1w_boldref, epi_t1_report, [('output_image', 'before')]), (boldref_wm, epi_t1_report, [('output_image', 'wm_seg')]), - (inputnode, ds_epi_t1_report, [('source_file', 'source_file')]), - (epi_t1_report, ds_epi_t1_report, [('out_report', 'in_file')]), - ]) - # fmt:on + (inputnode, prep_epi_t1_report, [('source_file', 'source_file')]), + (epi_t1_report, prep_epi_t1_report, [('out_report', 'in_file')]), + (prep_epi_t1_report, ds_epi_t1_report, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), + ]) # fmt:skip return workflow @@ -446,27 +484,33 @@ def init_ds_boldref_wf( name='sources', ) - ds_boldref = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + prep_boldref = pe.Node( + PrepareDerivative( desc=desc, suffix='boldref', compress=True, dismiss_entities=dismiss_echo(), ), + name='prep_boldref', + ) + ds_boldref = pe.Node( + SaveDerivative(base_directory=output_dir), name='ds_boldref', - run_without_submitting=True, ) - # fmt:off workflow.connect([ (inputnode, sources, [('source_files', 'in1')]), - (inputnode, ds_boldref, [('boldref', 'in_file'), - ('source_files', 'source_file')]), - (sources, ds_boldref, [('out', 'Sources')]), + (inputnode, prep_boldref, [ + ('boldref', 'in_file'), + ('source_files', 'source_file'), + ]), + (prep_boldref, ds_boldref, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), + ]), (ds_boldref, outputnode, [('out_file', 'boldref')]), - ]) - # fmt:on + ]) # fmt:skip return workflow @@ -495,25 +539,31 @@ def init_ds_boldmask_wf( name='sources', ) - ds_boldmask = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + prep_boldmask = pe.Node( + PrepareDerivative( desc=desc, suffix='mask', compress=True, dismiss_entities=dismiss_echo(), ), + name='prep_boldmask', + ) + ds_boldmask = pe.Node( + SaveDerivative(base_directory=output_dir), name='ds_boldmask', - run_without_submitting=True, ) workflow.connect([ (inputnode, sources, [('source_files', 'in1')]), - (inputnode, ds_boldmask, [ + (inputnode, prep_boldmask, [ ('boldmask', 'in_file'), ('source_files', 'source_file'), ]), - (sources, ds_boldmask, [('out', 'Sources')]), + (prep_boldmask, ds_boldmask, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), + ]), (ds_boldmask, outputnode, [('out_file', 'boldmask')]), ]) # fmt:skip @@ -545,29 +595,35 @@ def init_ds_registration_wf( name='sources', ) - ds_xform = pe.Node( - DerivativesDataSink( - base_directory=output_dir, - mode='image', + prep_xform = pe.Node( + PrepareDerivative( + desc='xfm', suffix='xfm', extension='.txt', dismiss_entities=dismiss_echo(['part']), **{'from': source, 'to': dest}, ), + name='prep_xform', + ) + ds_xform = pe.Node( + SaveDerivative(base_directory=output_dir), name='ds_xform', - run_without_submitting=True, - mem_gb=DEFAULT_MEMORY_MIN_GB, ) - # fmt:off workflow.connect([ (inputnode, sources, [('source_files', 'in1')]), - (inputnode, ds_xform, [('xform', 'in_file'), - ('source_files', 'source_file')]), - (sources, ds_xform, [('out', 'Sources')]), + (inputnode, prep_xform, [ + ('xform', 'in_file'), + ('source_files', 'source_file'), + ]), + (sources, prep_xform, [('out', 'Sources')]), + (prep_xform, ds_xform, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), + ]), (ds_xform, outputnode, [('out_file', 'xform')]), - ]) - # fmt:on + ]) # fmt:skip return workflow @@ -595,9 +651,8 @@ def init_ds_hmc_wf( name='sources', ) - ds_xforms = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + prep_xforms = pe.Node( + PrepareDerivative( desc='hmc', suffix='xfm', extension='.txt', @@ -605,19 +660,29 @@ def init_ds_hmc_wf( dismiss_entities=dismiss_echo(), **{'from': 'orig', 'to': 'boldref'}, ), + name='prep_xforms', + run_without_submitting=True, + ) + ds_xforms = pe.Node( + SaveDerivative(base_directory=output_dir), name='ds_xforms', run_without_submitting=True, ) - # fmt:off workflow.connect([ (inputnode, sources, [('source_files', 'in1')]), - (inputnode, ds_xforms, [('xforms', 'in_file'), - ('source_files', 'source_file')]), - (sources, ds_xforms, [('out', 'Sources')]), + (inputnode, prep_xforms, [ + ('xforms', 'in_file'), + ('source_files', 'source_file'), + ]), + (sources, prep_xforms, [('out', 'Sources')]), + (prep_xforms, ds_xforms, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), + ]), (ds_xforms, outputnode, [('out_file', 'xforms')]), - ]) - # fmt:on + ]) # fmt:skip return workflow @@ -668,9 +733,8 @@ def init_ds_bold_native_wf( ]) # fmt:skip if bold_output: - ds_bold = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + prep_bold = pe.Node( + PrepareDerivative( desc='preproc', compress=True, SkullStripped=multiecho, @@ -678,15 +742,26 @@ def init_ds_bold_native_wf( dismiss_entities=dismiss_echo(), **timing_parameters, ), + name='prep_bold', + mem_gb=DEFAULT_MEMORY_MIN_GB, + ) + ds_bold = pe.Node( + SaveDerivative(base_directory=output_dir), name='ds_bold', mem_gb=DEFAULT_MEMORY_MIN_GB, ) + workflow.connect([ - (inputnode, ds_bold, [ + (inputnode, prep_bold, [ ('source_files', 'source_file'), ('bold', 'in_file'), ]), - (sources, ds_bold, [('out', 'Sources')]), + (sources, prep_bold, [('out', 'Sources')]), + (prep_bold, ds_bold, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), + ]), ]) # fmt:skip if bold_output and multiecho: @@ -695,31 +770,40 @@ def init_ds_bold_native_wf( 'EstimationReference': 'doi:10.1002/mrm.20900', 'EstimationAlgorithm': 'monoexponential decay model', } - ds_t2star = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + prep_t2star = pe.Node( + PrepareDerivative( space='boldref', suffix='T2starmap', compress=True, dismiss_entities=dismiss_echo(), **t2star_meta, ), - name='ds_t2star_bold', + name='prep_t2star_bold', run_without_submitting=True, mem_gb=DEFAULT_MEMORY_MIN_GB, ) + ds_t2star = pe.Node( + SaveDerivative(base_directory=output_dir), + name='ds_t2star_bold', + mem_gb=DEFAULT_MEMORY_MIN_GB, + ) + workflow.connect([ - (inputnode, ds_t2star, [ + (inputnode, prep_t2star, [ ('source_files', 'source_file'), ('t2star', 'in_file'), ]), - (sources, ds_t2star, [('out', 'Sources')]), + (sources, prep_t2star, [('out', 'Sources')]), + (prep_t2star, ds_t2star, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), + ]), ]) # fmt:skip if echo_output: - ds_bold_echos = pe.MapNode( - DerivativesDataSink( - base_directory=output_dir, + prep_bold_echos = pe.MapNode( + PrepareDerivative( desc='preproc', compress=True, SkullStripped=False, @@ -727,16 +811,27 @@ def init_ds_bold_native_wf( **timing_parameters, ), iterfield=['source_file', 'in_file', 'meta_dict'], + name='prep_bold_echos', + mem_gb=DEFAULT_MEMORY_MIN_GB, + ) + prep_bold_echos.inputs.meta_dict = [{'EchoTime': md['EchoTime']} for md in all_metadata] + ds_bold_echos = pe.MapNode( + SaveDerivative(base_directory=output_dir), + iterfield=['in_file', 'source_file'], name='ds_bold_echos', - run_without_submitting=True, mem_gb=DEFAULT_MEMORY_MIN_GB, ) - ds_bold_echos.inputs.meta_dict = [{'EchoTime': md['EchoTime']} for md in all_metadata] + workflow.connect([ - (inputnode, ds_bold_echos, [ + (inputnode, prep_bold_echos, [ ('source_files', 'source_file'), ('bold_echos', 'in_file'), ]), + (prep_bold_echos, ds_bold_echos, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), + ]), ]) # fmt:skip return workflow @@ -790,9 +885,8 @@ def init_ds_volumes_wf( boldref2target = pe.Node(niu.Merge(2), name='boldref2target') # BOLD is pre-resampled - ds_bold = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + prep_bold = pe.Node( + PrepareDerivative( desc='preproc', compress=True, SkullStripped=multiecho, @@ -800,6 +894,11 @@ def init_ds_volumes_wf( dismiss_entities=dismiss_echo(), **timing_parameters, ), + name='prep_bold', + mem_gb=DEFAULT_MEMORY_MIN_GB, + ) + ds_bold = pe.Node( + SaveDerivative(base_directory=output_dir), name='ds_bold', mem_gb=DEFAULT_MEMORY_MIN_GB, ) @@ -818,14 +917,19 @@ def init_ds_volumes_wf( ('anat2std_xfm', 'in1'), ('boldref2anat_xfm', 'in2'), ]), - (inputnode, ds_bold, [ + (inputnode, prep_bold, [ ('source_files', 'source_file'), ('bold', 'in_file'), ('space', 'space'), ('cohort', 'cohort'), ('resolution', 'resolution'), ]), - (sources, ds_bold, [('out', 'Sources')]), + (sources, prep_bold, [('out', 'Sources')]), + (prep_bold, ds_bold, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), + ]), ]) # fmt:skip resample_ref = pe.Node( @@ -845,30 +949,29 @@ def init_ds_volumes_wf( (inputnode, resample_mask, [('bold_mask', 'input_image')]), ]) # fmt:skip - ds_ref = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + prep_ref = pe.Node( + PrepareDerivative( suffix='boldref', compress=True, dismiss_entities=dismiss_echo(), ), - name='ds_ref', + name='prep_ref', run_without_submitting=True, mem_gb=DEFAULT_MEMORY_MIN_GB, ) - ds_mask = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + + prep_mask = pe.Node( + PrepareDerivative( desc='brain', suffix='mask', compress=True, dismiss_entities=dismiss_echo(), ), - name='ds_mask', + name='prep_mask', run_without_submitting=True, mem_gb=DEFAULT_MEMORY_MIN_GB, ) - datasinks = [ds_ref, ds_mask] + prep_nodes = [prep_ref, prep_mask] if multiecho: t2star_meta = { @@ -885,23 +988,31 @@ def init_ds_volumes_wf( ), name='resample_t2star', ) - ds_t2star = pe.Node( - DerivativesDataSink( - base_directory=output_dir, + + prep_t2star_std = pe.Node( + PrepareDerivative( suffix='T2starmap', compress=True, dismiss_entities=dismiss_echo(), **t2star_meta, ), - name='ds_t2star_std', + name='prep_t2star_std', run_without_submitting=True, mem_gb=DEFAULT_MEMORY_MIN_GB, ) resamplers.append(resample_t2star) - datasinks.append(ds_t2star) + prep_nodes.append(prep_t2star_std) workflow.connect([(inputnode, resample_t2star, [('t2star', 'input_image')])]) + def make_ds(name: str) -> pe.Node: + return pe.Node( + SaveDerivative(base_directory=output_dir), + name=name.replace('prep', 'ds'), + run_without_submitting=True, + mem_gb=DEFAULT_MEMORY_MIN_GB, + ) + workflow.connect( [ (inputnode, resampler, [('ref_file', 'reference_image')]) @@ -910,19 +1021,26 @@ def init_ds_volumes_wf( (boldref2target, resampler, [('out', 'transforms')]) for resampler in resamplers ] + [ - (inputnode, datasink, [ + (inputnode, prep_node, [ ('source_files', 'source_file'), ('space', 'space'), ('cohort', 'cohort'), ('resolution', 'resolution'), ]) - for datasink in datasinks + for prep_node in prep_nodes ] + [ - (sources, datasink, [('out', 'Sources')]) - for datasink in datasinks + (sources, prep_node, [('out', 'Sources')]) + for prep_node in prep_nodes ] + [ - (resampler, datasink, [('output_image', 'in_file')]) - for resampler, datasink in zip(resamplers, datasinks, strict=False) + (resampler, prep_node, [('output_image', 'in_file')]) + for resampler, prep_node in zip(resamplers, prep_nodes, strict=True) + ] + [ + (prep_node, make_ds(prep_node.name), [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), + ]) + for prep_node in prep_nodes ] ) # fmt:skip @@ -972,8 +1090,6 @@ def init_bold_preproc_report_wf( from nireports.interfaces.reporting.base import SimpleBeforeAfterRPT from niworkflows.engine.workflows import LiterateWorkflow as Workflow - from ...interfaces import DerivativesDataSink - workflow = Workflow(name=name) inputnode = pe.Node( @@ -984,26 +1100,35 @@ def init_bold_preproc_report_wf( pos_tsnr = pe.Node(TSNR(), name='pos_tsnr', mem_gb=mem_gb * 4.5) bold_rpt = pe.Node(SimpleBeforeAfterRPT(), name='bold_rpt', mem_gb=0.1) - ds_report_bold = pe.Node( - DerivativesDataSink( - base_directory=reportlets_dir, + + prep_report_bold = pe.Node( + PrepareDerivative( desc='preproc', datatype='figures', dismiss_entities=dismiss_echo(), ), - name='ds_report_bold', + name='prep_report_bold', + run_without_submitting=True, mem_gb=DEFAULT_MEMORY_MIN_GB, + ) + ds_report_bold = pe.Node( + SaveDerivative(base_directory=reportlets_dir), + name='ds_report_bold', run_without_submitting=True, + mem_gb=DEFAULT_MEMORY_MIN_GB, ) - # fmt:off + workflow.connect([ - (inputnode, ds_report_bold, [('name_source', 'source_file')]), + (inputnode, prep_report_bold, [('name_source', 'source_file')]), (inputnode, pre_tsnr, [('in_pre', 'in_file')]), (inputnode, pos_tsnr, [('in_post', 'in_file')]), (pre_tsnr, bold_rpt, [('stddev_file', 'before')]), (pos_tsnr, bold_rpt, [('stddev_file', 'after')]), - (bold_rpt, ds_report_bold, [('out_report', 'in_file')]), - ]) - # fmt:on + (bold_rpt, prep_report_bold, [('out_report', 'in_file')]), + (prep_report_bold, ds_report_bold, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ]), + ]) # fmt:skip return workflow diff --git a/fmriprep/workflows/bold/resampling.py b/fmriprep/workflows/bold/resampling.py index d725da6f0..15a3addbc 100644 --- a/fmriprep/workflows/bold/resampling.py +++ b/fmriprep/workflows/bold/resampling.py @@ -114,11 +114,10 @@ def init_bold_surf_wf( """ from nipype.interfaces.io import FreeSurferSource from niworkflows.engine.workflows import LiterateWorkflow as Workflow + from niworkflows.interfaces.bids import PrepareDerivative, SaveDerivative from niworkflows.interfaces.nitransforms import ConcatenateXFMs from niworkflows.interfaces.surf import GiftiSetAnatomicalStructure - from fmriprep.interfaces import DerivativesDataSink - timing_parameters = prepare_timing_parameters(metadata) workflow = Workflow(name=name) @@ -191,20 +190,27 @@ def select_target(subject_id, space): mem_gb=DEFAULT_MEMORY_MIN_GB, ) - ds_bold_surfs = pe.MapNode( - DerivativesDataSink( - base_directory=output_dir, + prep_bold_surfs = pe.MapNode( + PrepareDerivative( extension='.func.gii', dismiss_entities=dismiss_echo(), TaskName=metadata.get('TaskName'), **timing_parameters, ), iterfield=['in_file', 'hemi'], + name='prep_bold_surfs', + run_without_submitting=True, + mem_gb=DEFAULT_MEMORY_MIN_GB, + ) + prep_bold_surfs.inputs.hemi = ['L', 'R'] + + ds_bold_surfs = pe.MapNode( + SaveDerivative(base_directory=output_dir), + iterfield=['in_file'], name='ds_bold_surfs', run_without_submitting=True, mem_gb=DEFAULT_MEMORY_MIN_GB, ) - ds_bold_surfs.inputs.hemi = ['L', 'R'] workflow.connect([ (inputnode, get_fsnative, [ @@ -225,11 +231,16 @@ def select_target(subject_id, space): (itersource, targets, [('target', 'space')]), (itk2lta, sampler, [('out_inv', 'reg_file')]), (targets, sampler, [('out', 'target_subject')]), - (inputnode, ds_bold_surfs, [('source_file', 'source_file')]), + (inputnode, prep_bold_surfs, [('source_file', 'source_file')]), (inputnode, surfs_sources, [('sources', 'in1')]), - (surfs_sources, ds_bold_surfs, [('out', 'Sources')]), - (itersource, ds_bold_surfs, [('target', 'space')]), - (update_metadata, ds_bold_surfs, [('out_file', 'in_file')]), + (surfs_sources, prep_bold_surfs, [('out', 'Sources')]), + (itersource, prep_bold_surfs, [('target', 'space')]), + (update_metadata, prep_bold_surfs, [('out_file', 'in_file')]), + (prep_bold_surfs, ds_bold_surfs, [ + ('out_file', 'in_file'), + ('out_path', 'relative_path'), + ('out_meta', 'metadata'), + ]), ]) # fmt:skip # Refine if medial vertices should be NaNs