From 2e62e868b0bbf6fc0692510ad6f7d87ef3a02686 Mon Sep 17 00:00:00 2001 From: elodiegermani1 Date: Thu, 20 Jul 2023 16:58:59 -0400 Subject: [PATCH 01/11] [1KB2] Reproduction adaptation to new structure and debug. --- narps_open/pipelines/team_1KB2_debug.py | 1673 ++++++++++++++--------- 1 file changed, 1047 insertions(+), 626 deletions(-) diff --git a/narps_open/pipelines/team_1KB2_debug.py b/narps_open/pipelines/team_1KB2_debug.py index 08afb164..257da340 100755 --- a/narps_open/pipelines/team_1KB2_debug.py +++ b/narps_open/pipelines/team_1KB2_debug.py @@ -1,197 +1,200 @@ -from nipype.interfaces.fsl import (BET, FAST, MCFLIRT, FLIRT, FNIRT, ApplyWarp, SUSAN, - Info, ImageMaths, IsotropicSmooth, Threshold, Level1Design, FEATModel, - L2Model, Merge, FLAMEO, ContrastMgr,Cluster, FILMGLS, Randomise, MultipleRegressDesign) -from nipype.algorithms.modelgen import SpecifyModel - -from niflow.nipype1.workflows.fmri.fsl import create_susan_smooth -from nipype.interfaces.utility import IdentityInterface, Function -from nipype.interfaces.io import SelectFiles, DataSink -from nipype.algorithms.misc import Gunzip -from nipype import Workflow, Node, MapNode -from nipype.interfaces.base import Bunch - -from os.path import join as opj -import os - -def get_preprocessing_1st_step(exp_dir, result_dir, working_dir, output_dir, subject_list, run_list, fwhm): - """ - Returns the 1st step of the preprocessing workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the preprocessing - - run_list: list of str, list of runs for which you want to do the preprocessing - - fwhm: float, fwhm for smoothing step - - Returns: - - preprocessing: Nipype WorkFlow - """ - infosource_preproc = Node(IdentityInterface(fields = ['subject_id', 'run_id']), - name = 'infosource_preproc') - - infosource_preproc.iterables = [('subject_id', subject_list), ('run_id', run_list)] +#!/usr/bin/python +# coding: utf-8 - # Templates to select files node - func_file = opj('sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz') +""" +This template can be use to reproduce a pipeline using FSL as main software. - template = {'func' : func_file} +- Replace all occurences of 48CD by the actual id of the team. +- All lines beging [INFO], are meant to help you during the reproduction, these can be removed +eventually. +- Also remove lines beging with [TODO], once you did what they suggested. +""" - # SelectFiles node - to select necessary files - selectfiles_preproc = Node(SelectFiles(template, base_directory=exp_dir), name = 'selectfiles_preproc') +# [TODO] Only import modules you use further in te code, remove others from the import section +from os.path import join - motion_correction = Node(MCFLIRT(mean_vol = True, save_plots=True, save_mats=True), name = 'motion_correction') - - datasink = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink') - - preprocessing = Workflow(base_dir = opj(result_dir, working_dir), name = "preprocessing") - - preprocessing.connect([(infosource_preproc, selectfiles_preproc, [('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (selectfiles_preproc, motion_correction, [('func', 'in_file')]), - (motion_correction, datasink, [('par_file', 'preprocess.@parameters_file')]) - ]) - - return preprocessing - -def get_preprocessing_2nd_step(exp_dir, result_dir, working_dir, output_dir, subject_list, run_list, fwhm): - """ - Returns the 2nd part of the preprocessing workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the preprocessing - - run_list: list of str, list of runs for which you want to do the preprocessing - - fwhm: float, fwhm for smoothing step - - Returns: - - preprocessing: Nipype WorkFlow - """ - infosource_preproc = Node(IdentityInterface(fields = ['subject_id', 'run_id']), - name = 'infosource_preproc') +# [INFO] The import of base objects from Nipype, to create Workflows +from nipype import Node, Workflow, MapNode - infosource_preproc.iterables = [('subject_id', subject_list), ('run_id', run_list)] - - # Templates to select files node - anat_file = opj('sub-{subject_id}', 'anat', - 'sub-{subject_id}_T1w.nii.gz') +# [INFO] a list of interfaces used to manpulate data +from nipype.interfaces.utility import IdentityInterface, Function +from nipype.interfaces.io import SelectFiles, DataSink +# from nipype.algorithms.misc import Gunzip - func_file = opj('sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz') - - mean_file = opj(result_dir, working_dir, 'preprocessing', '_run_id_{run_id}_subject_id_{subject_id}', - 'motion_correction', 'sub-{subject_id}_task-MGT_run-{run_id}_bold_mcf.nii.gz_mean_reg.nii.gz') - - motion_corrected_file = opj(result_dir, working_dir, 'preprocessing', '_run_id_{run_id}_subject_id_{subject_id}', - 'motion_correction', 'sub-{subject_id}_task-MGT_run-{run_id}_bold_mcf.nii.gz') +# [INFO] a list of FSL-specific interfaces +from nipype.interfaces.fsl import (BET, FAST, MCFLIRT, FLIRT, FNIRT, ApplyWarp, SUSAN, + Info, ImageMaths, IsotropicSmooth, Threshold, Level1Design, FEATModel, + L2Model, Merge, FLAMEO, ContrastMgr,Cluster, FILMGLS, Randomise, + MultipleRegressDesign, ImageStats, ExtractROI, PlotMotionParams, MeanImage) +from nipype.algorithms.modelgen import SpecifyModel +from niflow.nipype1.workflows.fmri.fsl import create_reg_workflow, create_featreg_preproc + +# [INFO] In order to inherit from Pipeline +from narps_open.pipelines import Pipeline + +class PipelineTeam1KB2(Pipeline): + """ A class that defines the pipeline of team 1KB2 """ + + def __init__(self): + # [INFO] Remove the init method completely if unused + # [TODO] Init the attributes of the pipeline, if any other than the ones defined + # in the pipeline class + pass + + def get_preprocessing(self): + """ Return a Nipype worflow describing the prerpocessing part of the pipeline """ + + # [INFO] The following part stays the same for all preprocessing pipelines + + # IdentityInterface node - allows to iterate over subjects and runs + info_source = Node( + IdentityInterface(fields=['subject_id', 'run_id']), + name='info_source' + ) + info_source.iterables = [ + ('subject_id', self.subject_list), + ('run_id', self.run_list), + ] + + # Templates to select files node + file_templates = { + 'anat': join( + 'sub-{subject_id}', 'anat', 'sub-{subject_id}_T1w.nii.gz' + ), + 'func': join( + 'sub-{subject_id}', 'func', 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz' + ) + } + + # SelectFiles node - to select necessary files + select_files = Node( + SelectFiles(file_templates, base_directory = self.directories.dataset_dir), + name='select_files' + ) + + # DataSink Node - store the wanted results in the wanted repository + data_sink = Node( + DataSink(base_directory = self.directories.output_dir), + name='data_sink', + ) + + img2float = Node( + ImageMaths(out_data_type='float', op_string='', suffix='_dtype'), + name='img2float', + ) - parameters_file = opj(result_dir, working_dir, 'preprocessing', '_run_id_{run_id}_subject_id_{subject_id}', - 'motion_correction', 'sub-{subject_id}_task-MGT_run-{run_id}_bold_mcf.nii.gz.par') - - template = {'anat' : anat_file, 'func' : func_file, 'mean':mean_file, 'motion_corrected':motion_corrected_file, - 'param':parameters_file} - - # SelectFiles node - to select necessary files - selectfiles_preproc = Node(SelectFiles(template, base_directory=exp_dir), name = 'selectfiles_preproc') - - skullstrip = Node(BET(frac = 0.3, robust = True), name = 'skullstrip') - - fast = Node(FAST(), name='fast') - #register.connect(stripper, 'out_file', fast, 'in_files') - - binarize = Node(ImageMaths(op_string='-nan -thr 0.5 -bin'), name='binarize') - pickindex = lambda x, i: x[i] - #register.connect(fast, ('partial_volume_files', pickindex, 2), binarize, - # 'in_file') - - motion_correction = Node(MCFLIRT(mean_vol = True, save_plots=True, save_mats=True), name = 'motion_correction') - - mean2anat = Node(FLIRT(dof = 12), name = 'mean2anat') - - mean2anat_bbr = Node(FLIRT(dof = 12, cost = 'bbr', schedule = opj(os.getenv('FSLDIR'), 'etc/flirtsch/bbr.sch')), - name = 'mean2anat_bbr') - - anat2mni_linear = Node(FLIRT(dof = 12, reference = Info.standard_image('MNI152_T1_2mm.nii.gz')), - name = 'anat2mni_linear') - - anat2mni_nonlinear = Node(FNIRT(fieldcoeff_file = True, ref_file = Info.standard_image('MNI152_T1_2mm.nii.gz')), - name = 'anat2mni_nonlinear') - - warp_all = Node(ApplyWarp(interp='spline', ref_file = Info.standard_image('MNI152_T1_2mm.nii.gz')), - name='warp_all') - - smooth = create_susan_smooth() - smooth.inputs.inputnode.fwhm = fwhm - smooth.inputs.inputnode.mask_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz') - - datasink = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink') - - preprocessing = Workflow(base_dir = opj(result_dir, working_dir), name = "preprocessing") - - preprocessing.connect([(infosource_preproc, selectfiles_preproc, [('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (selectfiles_preproc, skullstrip, [('anat', 'in_file')]), - (selectfiles_preproc, motion_correction, [('func', 'in_file')]), - (skullstrip, fast, [('out_file', 'in_files')]), - (fast, binarize, [(('partial_volume_files', pickindex, 2), 'in_file')]), - (selectfiles_preproc, mean2anat, [('mean', 'in_file')]), - (skullstrip, mean2anat, [('out_file', 'reference')]), - (selectfiles_preproc, mean2anat_bbr, [('mean', 'in_file')]), - (binarize, mean2anat_bbr, [('out_file', 'wm_seg')]), - (selectfiles_preproc, mean2anat_bbr, [('anat', 'reference')]), - (mean2anat, mean2anat_bbr, [('out_matrix_file', 'in_matrix_file')]), - (skullstrip, anat2mni_linear, [('out_file', 'in_file')]), - (anat2mni_linear, anat2mni_nonlinear, [('out_matrix_file', 'affine_file')]), - (selectfiles_preproc, anat2mni_nonlinear, [('anat', 'in_file')]), - (selectfiles_preproc, warp_all, [('motion_corrected', 'in_file')]), - (mean2anat_bbr, warp_all, [('out_matrix_file', 'premat')]), - (anat2mni_nonlinear, warp_all, [('fieldcoeff_file', 'field_file')]), - (warp_all, smooth, [('out_file', 'inputnode.in_files')]), - (smooth, datasink, [('outputnode.smoothed_files', 'preprocess.@smoothed_file')]), - (selectfiles_preproc, datasink, [('param', 'preprocess.@parameters_file')]) - ]) + extract_mean = Node( + MeanImage(), + name = 'extract_mean', + ) + + reg = create_reg_workflow() + reg.inputs.inputspec.target_image = Info.standard_image('MNI152_T1_2mm_brain.nii.gz') + reg.inputs.inputspec.target_image_brain = Info.standard_image('MNI152_T1_2mm_brain.nii.gz') + + mc_smooth = create_featreg_preproc( + name='featpreproc', + highpass=True, + whichvol='middle', + whichrun=0) + + mc_smooth.inputs.inputspec.fwhm = 7 + mc_smooth.inputs.inputspec.highpass = 100 + + # [INFO] The following part defines the nipype workflow and the connections between nodes + + preprocessing = Workflow( + base_dir = self.directories.working_dir, + name = 'preprocessing' + ) + + preprocessing.connect( + [ + ( + info_source, + select_files, + [('subject_id', 'subject_id'), + ('run_id', 'run_id')], + ), + ( + select_files, + img2float, + [('func', 'in_file')], + ), + ( + img2float, + extract_mean, + [('out_file', 'in_file')], + ), + ( + extract_mean, + reg, + [('out_file', 'inputspec.mean_image')], + ), + ( + select_files, + reg, + [('func', 'inputspec.source_files'), ('anat', 'inputspec.anatomical_image')], + ), + ( + select_files, + mc_smooth, + [('func', 'inputspec.func')], + ), + ( + reg, + data_sink, + [('outputspec.anat2target_transform', 'preprocess.@transfo_all'), + ('outputspec.func2anat_transform', 'preprocess.@transfo_init')], + ), + ( + mc_smooth, + data_sink, + [('outputspec.motion_parameters', 'preprocess.@parameters_file'), + ('outputspec.highpassed_files', 'preprocess.@hp'), + ('outputspec.smoothed_files', 'preprocess.@smooth'), + ('outputspec.mask', 'preprocess.@mask')], + ) + ] + ) + + # [INFO] Here we simply return the created workflow + return preprocessing + + # [INFO] This function is used in the subject level analysis pipelines using FSL + def get_subject_infos(event_file: str): + """ + Create Bunchs for specifyModel. + + Parameters : + - event_file : file corresponding to the run and the subject to analyze + + Returns : + - subject_info : list of Bunch for 1st level analysis. + """ + + from os.path import join as opj + from nipype.interfaces.base import Bunch + + cond_names = ['trial', 'gain', 'loss'] + + onset = {} + duration = {} + amplitude = {} + + for c in cond_names: # For each condition. + onset.update({c : []}) # creates dictionary items with empty lists + duration.update({c : []}) + amplitude.update({c : []}) - return preprocessing + with open(event_file, 'rt') as f: + next(f) # skip the header -def get_session_infos(event_file): - ''' - Create Bunchs for specifyModel. - - Parameters : - - event_file : str, file corresponding to the run and the subject to analyze - - Returns : - - subject_info : list of Bunch for 1st level analysis. - ''' - from os.path import join as opj - from nipype.interfaces.base import Bunch - - cond_names = ['trial', 'gain', 'loss'] - - onset = {} - duration = {} - amplitude = {} - - for c in cond_names: # For each condition. - onset.update({c : []}) # creates dictionary items with empty lists - duration.update({c : []}) - amplitude.update({c : []}) - - with open(event_file, 'rt') as f: - next(f) # skip the header - - for line in f: - info = line.strip().split() - # Creates list with onsets, duration and loss/gain for amplitude (FSL) - for c in cond_names: - if info[5] != 'NoResp': + for line in f: + info = line.strip().split() + # Creates list with onsets, duration and loss/gain for amplitude (FSL) + for c in cond_names: onset[c].append(float(info[0])) duration[c].append(float(4)) if c == 'gain': @@ -211,453 +214,871 @@ def get_session_infos(event_file): amplitude[c].append(float(0)) else: amplitude[c].append(float(0)) - - - subject_info = [] - - subject_info.append(Bunch(conditions=cond_names, - onsets=[onset[k] for k in cond_names], - durations=[duration[k] for k in cond_names], - amplitudes=[amplitude[k] for k in cond_names], - regressor_names=None, - regressors=None)) - - return subject_info - -# Linear contrast effects: 'Gain' vs. baseline, 'Loss' vs. baseline. -def get_contrasts(subject_id): - ''' - Create the list of tuples that represents contrasts. - Each contrast is in the form : - (Name,Stat,[list of condition names],[weights on those conditions]) - - Parameters: - - subject_id: str, ID of the subject - - Returns: - - contrasts: list of tuples, list of contrasts to analyze - ''' - # list of condition names - conditions = ['gain', 'loss'] - - # create contrasts - gain = ('gain', 'T', conditions, [1, 0]) - - loss = ('loss', 'T', conditions, [0, 1]) - - # contrast list - contrasts = [gain, loss] - - return contrasts - - -def rm_preproc_files(files, subject_id, run_id, result_dir, working_dir): - import shutil - from os.path import join as opj - - preproc_dir = opj(result_dir, working_dir, 'preprocessing', f"_run_id_{run_id}_subject_id_{subject_id}") - - try: - shutil.rmtree(preproc_dir) - except OSError as e: - print(e) - else: - print("The directory is deleted successfully") - -def get_l1_analysis(subject_list, run_list, TR, exp_dir, output_dir, working_dir, result_dir): - """ - Returns the first level analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the analysis - - run_list: list of str, list of runs for which you want to do the analysis - - TR: float, time repetition used during acquisition - - Returns: - - l1_analysis : Nipype WorkFlow - """ - # Infosource Node - To iterate on subject and runs - infosource = Node(IdentityInterface(fields = ['subject_id', 'run_id']), name = 'infosource') - infosource.iterables = [('subject_id', subject_list), - ('run_id', run_list)] - - # Templates to select files node - func_file = opj(result_dir, output_dir, 'preprocess', '_run_id_{run_id}_subject_id_{subject_id}','sub-{subject_id}_task-MGT_run-{run_id}_bold_mcf_warp_smooth.nii.gz') - - event_file = opj('sub-{subject_id}', 'func', 'sub-{subject_id}_task-MGT_run-{run_id}_events.tsv') - - param_file = opj(result_dir, output_dir, 'preprocess', '_run_id_{run_id}_subject_id_{subject_id}', 'sub-{subject_id}_task-MGT_run-{run_id}_bold_mcf.nii.gz.par') - - template = {'func' : func_file, 'event' : event_file, 'param' : param_file} - - # SelectFiles node - to select necessary files - selectfiles = Node(SelectFiles(template, base_directory=exp_dir), name = 'selectfiles') - - # DataSink Node - store the wanted results in the wanted repository - datasink = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink') - - # Node contrasts to get contrasts - contrasts = Node(Function(function=get_contrasts, - input_names=['subject_id'], - output_names=['contrasts']), - name='contrasts') - - # Get Subject Info - get subject specific condition information - get_subject_infos = Node(Function(input_names=['event_file'], - output_names=['subject_info'], - function=get_session_infos), - name='get_subject_infos') - - specify_model = Node(SpecifyModel(high_pass_filter_cutoff = 60, - input_units = 'secs', - time_repetition = TR), name = 'specify_model') - - l1_design = Node(Level1Design(bases = {'dgamma':{'derivs' : True}}, - interscan_interval = TR, - model_serial_correlations = True), - name = 'l1_design') - - model_generation = Node(FEATModel(), name = 'model_generation') - - model_estimate = Node(FILMGLS(), name='model_estimate') - - remove_preproc = Node(Function(input_names = ['subject_id', 'run_id', 'files', 'result_dir', 'working_dir'], - function = rm_preproc_files), name = 'remove_preproc') - - remove_preproc.inputs.result_dir = result_dir - remove_preproc.inputs.working_dir = working_dir - - # Create l1 analysis workflow and connect its nodes - l1_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = "l1_analysis") - - l1_analysis.connect([(infosource, selectfiles, [('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (selectfiles, get_subject_infos, [('event', 'event_file')]), - (infosource, contrasts, [('subject_id', 'subject_id')]), - (selectfiles, specify_model, [('param', 'realignment_parameters')]), - (selectfiles, specify_model, [('func', 'functional_runs')]), - (get_subject_infos, specify_model, [('subject_info', 'subject_info')]), - (contrasts, l1_design, [('contrasts', 'contrasts')]), - (specify_model, l1_design, [('session_info', 'session_info')]), - (l1_design, model_generation, [('ev_files', 'ev_files'), ('fsf_files', 'fsf_file')]), - (selectfiles, model_estimate, [('func', 'in_file')]), - (model_generation, model_estimate, [('con_file', 'tcon_file'), - ('design_file', 'design_file')]), - (model_estimate, datasink, [('results_dir', 'l1_analysis.@results')]), - (model_generation, datasink, [('design_file', 'l1_analysis.@design_file'), - ('design_image', 'l1_analysis.@design_img')]) - ]) - - return l1_analysis - -def get_l2_analysis(subject_list, contrast_list, run_list, exp_dir, output_dir, working_dir, result_dir): - """ - Returns the 2nd level of analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the preprocessing - - contrast_list: list of str, list of contrasts to analyze - - run_list: list of str, list of runs for which you want to do the analysis - - - Returns: - - l2_analysis: Nipype WorkFlow - """ - # Infosource Node - To iterate on subject and runs - infosource_2ndlevel = Node(IdentityInterface(fields=['subject_id', 'contrast_id']), name='infosource_2ndlevel') - infosource_2ndlevel.iterables = [('subject_id', subject_list), ('contrast_id', contrast_list)] - - # Templates to select files node - copes_file = opj(output_dir, 'l1_analysis', '_run_id_*_subject_id_{subject_id}', 'results', - 'cope{contrast_id}.nii.gz') - - varcopes_file = opj(output_dir, 'l1_analysis', '_run_id_*_subject_id_{subject_id}', 'results', - 'varcope{contrast_id}.nii.gz') - - template = {'cope' : copes_file, 'varcope' : varcopes_file} - - # SelectFiles node - to select necessary files - selectfiles_2ndlevel = Node(SelectFiles(template, base_directory=result_dir), name = 'selectfiles_2ndlevel') - - datasink_2ndlevel = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink_2ndlevel') - - # Generate design matrix - specify_model_2ndlevel = Node(L2Model(num_copes = len(run_list)), name='l2model_2ndlevel') - - # Merge copes and varcopes files for each subject - merge_copes_2ndlevel = Node(Merge(dimension='t'), name='merge_copes_2ndlevel') - - merge_varcopes_2ndlevel = Node(Merge(dimension='t'), name='merge_varcopes_2ndlevel') - - # Second level (single-subject, mean of all four scans) analyses: Fixed effects analysis. - flame = Node(FLAMEO(run_mode = 'fe', mask_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz')), - name='flameo') - - l2_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = "l2_analysis") - - l2_analysis.connect([(infosource_2ndlevel, selectfiles_2ndlevel, [('subject_id', 'subject_id'), - ('contrast_id', 'contrast_id')]), - (selectfiles_2ndlevel, merge_copes_2ndlevel, [('cope', 'in_files')]), - (selectfiles_2ndlevel, merge_varcopes_2ndlevel, [('varcope', 'in_files')]), - (merge_copes_2ndlevel, flame, [('merged_file', 'cope_file')]), - (merge_varcopes_2ndlevel, flame, [('merged_file', 'var_cope_file')]), - (specify_model_2ndlevel, flame, [('design_mat', 'design_file'), - ('design_con', 't_con_file'), - ('design_grp', 'cov_split_file')]), - (flame, datasink_2ndlevel, [('zstats', 'l2_analysis.@stats'), - ('tstats', 'l2_analysis.@tstats'), - ('copes', 'l2_analysis.@copes'), - ('var_copes', 'l2_analysis.@varcopes')])]) - - return l2_analysis - -def get_subgroups_contrasts(copes, varcopes, subject_ids, participants_file): - ''' - Parameters : - - copes: original file list selected by selectfiles node - - varcopes: original file list selected by selectfiles node - - subject_ids: list of subject IDs that are analyzed - - participants_file: str, file containing participants caracteristics - - This function return the file list containing only the files belonging to subject in the wanted group. - ''' - - from os.path import join as opj - - equalRange_id = [] - equalIndifference_id = [] - - subject_list = ['sub-' + str(i) for i in subject_ids] - - with open(participants_file, 'rt') as f: - next(f) # skip the header - - for line in f: - info = line.strip().split() - - if info[0] in subject_list and info[1] == "equalIndifference": - equalIndifference_id.append(info[0][-3:]) - elif info[0] in subject_list and info[1] == "equalRange": - equalRange_id.append(info[0][-3:]) - - copes_equalIndifference = [] - copes_equalRange = [] - varcopes_equalIndifference = [] - varcopes_equalRange = [] - - for file in copes: - sub_id = file.split('/') - if sub_id[-2][-3:] in equalIndifference_id: - copes_equalIndifference.append(file) - elif sub_id[-2][-3:] in equalRange_id: - copes_equalRange.append(file) - - for file in varcopes: - sub_id = file.split('/') - if sub_id[-2][-3:] in equalIndifference_id: - varcopes_equalIndifference.append(file) - elif sub_id[-2][-3:] in equalRange_id: - varcopes_equalRange.append(file) - - return copes_equalIndifference, copes_equalRange, varcopes_equalIndifference, varcopes_equalRange, equalIndifference_id, equalRange_id - -def get_regs(equalRange_id, equalIndifference_id, method, subject_list): - """ - Create dictionnary of regressors for group analysis. - - Parameters: - - equalRange_id: list of str, ids of subjects in equal range group - - equalIndifference_id: list of str, ids of subjects in equal indifference group - - method: one of "equalRange", "equalIndifference" or "groupComp" - - Returns: - - regressors: dict, dictionnary of regressors used to distinguish groups in FSL group analysis - """ - if method == "equalRange": - regressors = dict(group_mean = [1 for i in range(len(equalRange_id))]) + + + subject_info = [] + + subject_info.append(Bunch( + conditions=cond_names, + onsets=[onset[k] for k in cond_names], + durations=[duration[k] for k in cond_names], + amplitudes=[amplitude[k] for k in cond_names], + regressor_names=None, + regressors=None) + ) + + return subject_info + + # [INFO] This function creates the contrasts that will be analyzed in the first level analysis + def get_contrasts(): + ''' + Create the list of tuples that represents contrasts. + Each contrast is in the form : + (Name,Stat,[list of condition names],[weights on those conditions]) + + Parameters: + - subject_id: str, ID of the subject + + Returns: + - contrasts: list of tuples, list of contrasts to analyze + ''' + # list of condition names + conditions = ['gain', 'loss'] - elif method == "equalIndifference": - regressors = dict(group_mean = [1 for i in range(len(equalIndifference_id))]) + # create contrasts + gain = ('gain', 'T', conditions, [1, 0]) - elif method == "groupComp": - equalRange_reg = [1 for i in range(len(equalRange_id) + len(equalIndifference_id))] - equalIndifference_reg = [0 for i in range(len(equalRange_id) + len(equalIndifference_id))] + loss = ('loss', 'T', conditions, [0, 1]) - for i, sub_id in enumerate(subject_list): - if sub_id in equalIndifference_id: - index = i - equalIndifference_reg[index] = 1 - equalRange_reg[index] = 0 - - regressors = dict(equalRange = equalRange_reg, - equalIndifference = equalIndifference_reg) - - return regressors - -def get_group_workflow(subject_list, n_sub, contrast_list, method, exp_dir, output_dir, - working_dir, result_dir): - """ - Returns the group level of analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the preprocessing - - contrast_list: list of str, list of contrasts to analyze - - method: one of "equalRange", "equalIndifference" or "groupComp" - - n_sub: number of subject in subject_list + # contrast list + contrasts = [gain, loss] - Returns: - - l2_analysis: Nipype WorkFlow - """ - # Infosource Node - To iterate on subject and runs - infosource_3rdlevel = Node(IdentityInterface(fields = ['contrast_id', 'exp_dir', 'result_dir', - 'output_dir', 'working_dir', 'subject_list', 'method'], - exp_dir = exp_dir, result_dir = result_dir, - output_dir = output_dir, working_dir = working_dir, - subject_list = subject_list, method = method), - name = 'infosource_3rdlevel') - infosource_3rdlevel.iterables = [('contrast_id', contrast_list)] - - # Templates to select files node - copes_file = opj(output_dir, 'l2_analysis', '_contrast_id_{contrast_id}_subject_id_*', - 'cope1.nii.gz') - - varcopes_file = opj(output_dir, 'l2_analysis', '_contrast_id_{contrast_id}_subject_id_*', - 'varcope1.nii.gz') - - participants_file = opj(exp_dir, 'participants.tsv') - - template = {'cope' : copes_file, 'varcope' : varcopes_file, 'participants' : participants_file} - - # SelectFiles node - to select necessary files - selectfiles_3rdlevel = Node(SelectFiles(template, base_directory=result_dir), name = 'selectfiles_3rdlevel') - - datasink_3rdlevel = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink_3rdlevel') - - merge_copes_3rdlevel = Node(Merge(dimension = 't'), name = 'merge_copes_3rdlevel') - merge_varcopes_3rdlevel = Node(Merge(dimension = 't'), name = 'merge_varcopes_3rdlevel') - - subgroups_contrasts = Node(Function(input_names = ['copes', 'varcopes', 'subject_ids', 'participants_file'], - output_names = ['copes_equalIndifference', 'copes_equalRange', - 'varcopes_equalIndifference', 'varcopes_equalRange', - 'equalIndifference_id', 'equalRange_id'], - function = get_subgroups_contrasts), - name = 'subgroups_contrasts') - - specifymodel_3rdlevel = Node(MultipleRegressDesign(), name = 'specifymodel_3rdlevel') - - flame_3rdlevel = Node(FLAMEO(run_mode = 'flame1', mask_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz')), - name='flame_3rdlevel') - - regs = Node(Function(input_names = ['equalRange_id', 'equalIndifference_id', 'method', 'subject_list'], - output_names = ['regressors'], - function = get_regs), name = 'regs') - regs.inputs.method = method - regs.inputs.subject_list = subject_list - - cluster = MapNode(Cluster(threshold = 3.1, out_threshold_file = True), name = 'cluster', - iterfield = ['in_file', 'cope_file'], synchronize = True) - - l3_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = f"l3_analysis_{method}_nsub_{n_sub}") - - l3_analysis.connect([(infosource_3rdlevel, selectfiles_3rdlevel, [('contrast_id', 'contrast_id')]), - (infosource_3rdlevel, subgroups_contrasts, [('subject_list', 'subject_ids')]), - (selectfiles_3rdlevel, subgroups_contrasts, [('cope', 'copes'), ('varcope', 'varcopes'), - ('participants', 'participants_file')]), - (subgroups_contrasts, regs, [('equalRange_id', 'equalRange_id'), - ('equalIndifference_id', 'equalIndifference_id')]), - (regs, specifymodel_3rdlevel, [('regressors', 'regressors')])]) - - - if method == 'equalIndifference' or method == 'equalRange': - specifymodel_3rdlevel.inputs.contrasts = [["group_mean", "T", ["group_mean"], [1]], ["group_mean_neg", "T", ["group_mean"], [-1]]] + return contrasts + + def get_run_level_analysis(self): + """ + Returns the first level analysis workflow. + + Parameters: + - exp_dir: str, directory where raw data are stored + - result_dir: str, directory where results will be stored + - working_dir: str, name of the sub-directory for intermediate results + - output_dir: str, name of the sub-directory for final results + - subject_list: list of str, list of subject for which you want to do the analysis + - run_list: list of str, list of runs for which you want to do the analysis + - TR: float, time repetition used during acquisition + + Returns: + - l1_analysis : Nipype WorkFlow + """ + # Infosource Node - To iterate on subject and runs + info_source = Node( + IdentityInterface( + fields = ['subject_id', 'run_id'] + ), + name = 'info_source' + ) + + info_source.iterables = [ + ('subject_id', self.subject_list), + ('run_id', self.run_list) + ] + + templates = { + 'func': join( + self.directories.output_dir, + 'preprocess', + '_run_id_{run_id}_subject_id_{subject_id}','_addmean0', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_dtype_mcf_mask_smooth_mask_gms_tempfilt_maths.nii.gz', + ), + 'event': join( + self.directories.dataset_dir, + 'sub-{subject_id}', + 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_events.tsv', + ), + 'param': join( + self.directories.output_dir, + 'preprocess', + '_run_id_{run_id}_subject_id_{subject_id}', '_realign0', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_dtype_mcf.nii.gz.par' + ) + } + + # SelectFiles node - to select necessary files + select_files = Node( + SelectFiles(templates, base_directory = self.directories.dataset_dir), + name = 'select_files' + ) + + # DataSink Node - store the wanted results in the wanted repository + data_sink = Node( + DataSink(base_directory = self.directories.output_dir), + name = 'data_sink' + ) + + # [INFO] This is the node executing the get_subject_infos_spm function + # Subject Infos node - get subject specific condition information + subject_infos = Node( + Function( + input_names = ['event_file'], + output_names = ['subject_info'], + function = self.get_subject_infos, + ), + name = 'subject_infos', + ) + subject_infos.inputs.runs = self.run_list + + # Contrasts node - to get contrasts + contrasts = Node( + Function( + output_names = ['contrasts'], + function = self.get_contrasts, + ), + name = 'contrasts', + ) + specify_model = Node( + SpecifyModel( + high_pass_filter_cutoff = 100, + input_units = 'secs', + time_repetition = 1, + ), + name = 'specify_model' + ) + + l1_design = Node( + Level1Design( + bases = {'dgamma':{'derivs' : True}}, + interscan_interval = 1, + model_serial_correlations = True, + ), + name = 'l1_design' + ) + + model_generation = Node( + FEATModel( + ), + name = 'model_generation' + ) + + model_estimate = Node(FILMGLS( + ), + name='model_estimate' + ) + + # Create l1 analysis workflow and connect its nodes + l1_analysis = Workflow( + base_dir = self.directories.working_dir, + name = "run_level_analysis" + ) + + l1_analysis.connect([ + ( + info_source, + select_files, + [('subject_id', 'subject_id'), + ('run_id', 'run_id')] + ), + ( + select_files, + subject_infos, + [('event', 'event_file')] + ), + ( + select_files, + specify_model, + [('param', 'realignment_parameters')] + ), + ( + select_files, + specify_model, + [('func', 'functional_runs')] + ), + ( + subject_infos, + specify_model, + [('subject_info', 'subject_info')] + ), + ( + contrasts, + l1_design, + [('contrasts', 'contrasts')] + ), + ( + specify_model, + l1_design, + [('session_info', 'session_info')] + ), + ( + l1_design, + model_generation, + [('ev_files', 'ev_files'), + ('fsf_files', 'fsf_file')] + ), + ( + select_files, + model_estimate, + [('func', 'in_file')] + ), + ( + model_generation, + model_estimate, + [('con_file', 'tcon_file'), + ('design_file', 'design_file')] + ), + ( + model_estimate, + data_sink, + [('results_dir', 'run_level_analysis.@results')] + ), + ( + model_generation, + data_sink, + [('design_file', 'run_level_analysis.@design_file'), + ('design_image', 'run_level_analysis.@design_img')] + ) + ]) + + return l1_analysis + + def get_registration(self): + """ Return a Nipype worflow describing the registration part of the pipeline """ - if method == 'equalIndifference': - l3_analysis.connect([(subgroups_contrasts, merge_copes_3rdlevel, - [('copes_equalIndifference', 'in_files')]), - (subgroups_contrasts, merge_varcopes_3rdlevel, - [('varcopes_equalIndifference', 'in_files')])]) - elif method == 'equalRange': - l3_analysis.connect([(subgroups_contrasts, merge_copes_3rdlevel, [('copes_equalRange', 'in_files')]), - (subgroups_contrasts, merge_varcopes_3rdlevel, [('varcopes_equalRange', 'in_files')])]) - - elif method == "groupComp": - specifymodel_3rdlevel.inputs.contrasts = [["equalRange_sup", "T", ["equalRange", "equalIndifference"], - [1, -1]]] + # Infosource Node - To iterate on subjects + info_source = Node( + IdentityInterface( + fields = ['subject_id', 'contrast_id', 'run_id'], + ), + name='info_source', + ) + info_source.iterables = [('subject_id', self.subject_list), + ('contrast_id', self.contrast_list), + ('run_id', self.run_list)] + + # Templates to select files node + # [TODO] Change the name of the files depending on the filenames of results of preprocessing + templates = { + 'cope': join( + self.directories.output_dir, + 'run_level_analysis', + '_run_id_{run_id}_subject_id_{subject_id}', + 'results', + 'cope{contrast_id}.nii.gz', + ), + 'varcope': join( + self.directories.output_dir, + 'run_level_analysis', + '_run_id_{run_id}_subject_id_{subject_id}', + 'results', + 'varcope{contrast_id}.nii.gz', + ), + 'func2anat_transform':join( + self.directories.output_dir, + 'preprocess', + '_run_id_{run_id}_subject_id_{subject_id}', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_dtype_mean_flirt.mat' + ), + 'anat2target_transform':join( + self.directories.output_dir, + 'preprocess', + '_run_id_{run_id}_subject_id_{subject_id}', + 'sub-{subject_id}_T1w_fieldwarp.nii.gz' + ) + } + + # SelectFiles node - to select necessary files + select_files = Node( + SelectFiles(templates, base_directory = self.directories.dataset_dir), + name = 'select_files' + ) + + # DataSink Node - store the wanted results in the wanted repository + data_sink = Node( + DataSink(base_directory = self.directories.output_dir), + name = 'data_sink' + ) - l3_analysis.connect([(selectfiles_3rdlevel, merge_copes_3rdlevel, [('cope', 'in_files')]), - (selectfiles_3rdlevel, merge_varcopes_3rdlevel, [('varcope', 'in_files')])]) - - l3_analysis.connect([(merge_copes_3rdlevel, flame_3rdlevel, [('merged_file', 'cope_file')]), - (merge_varcopes_3rdlevel, flame_3rdlevel, [('merged_file', 'var_cope_file')]), - (specifymodel_3rdlevel, flame_3rdlevel, [('design_mat', 'design_file'), - ('design_con', 't_con_file'), - ('design_grp', 'cov_split_file')]), - (flame_3rdlevel, cluster, [('zstats', 'in_file'), - ('copes', 'cope_file')]), - (flame_3rdlevel, datasink_3rdlevel, [('zstats', - f"l3_analysis_{method}_nsub_{n_sub}.@zstats"), - ('tstats', - f"l3_analysis_{method}_nsub_{n_sub}.@tstats")]), - (cluster, datasink_3rdlevel, [('threshold_file', f"l3_analysis_{method}_nsub_{n_sub}.@thresh")])]) - - return l3_analysis - -def reorganize_results(result_dir, output_dir, n_sub, team_ID): - """ - Reorganize the results to analyze them. - - Parameters: - - result_dir: str, directory where results will be stored - - output_dir: str, name of the sub-directory for final results - - n_sub: int, number of subject used for analysis - - team_ID: str, name of the team ID for which we reorganize files - """ - from os.path import join as opj - import os - import shutil - import gzip - import nibabel as nib - import numpy as np - - h1 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_1') - h2 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_1') - h3 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_1') - h4 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_1') - h5 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_2') - h6 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_2') - h7 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_2') - h8 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_2') - h9 = opj(result_dir, output_dir, f"l3_analysis_groupComp_nsub_{n_sub}", '_contrast_id_2') - - h = [h1, h2, h3, h4, h5, h6, h7, h8, h9] - - repro_unthresh = [opj(filename, "zstat2.nii.gz") if i in [4, 5] else opj(filename, "zstat1.nii.gz") for i, filename in enumerate(h)] - - repro_thresh = [opj(filename, '_cluster1', "zstat2_threshold.nii.gz") if i in [4, 5] else opj(filename, '_cluster0', "zstat1_threshold.nii.gz") for i, filename in enumerate(h)] - - if not os.path.isdir(opj(result_dir, "NARPS-reproduction")): - os.mkdir(opj(result_dir, "NARPS-reproduction")) - - for i, filename in enumerate(repro_unthresh): - f_in = filename - f_out = opj(result_dir, "NARPS-reproduction", f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_unthresholded.nii.gz") - shutil.copyfile(f_in, f_out) + warpall_cope = MapNode( + ApplyWarp(interp='spline'), + name='warpall_cope', + iterfield=['in_file'] + ) + + warpall_cope.inputs.ref_file = Info.standard_image('MNI152_T1_2mm_brain.nii.gz') + warpall_cope.inputs.mask_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz') + + warpall_varcope = MapNode( + ApplyWarp(interp='spline'), + name='warpall_varcope', + iterfield=['in_file'] + ) + + warpall_varcope.inputs.ref_file = Info.standard_image('MNI152_T1_2mm_brain.nii.gz') + warpall_varcope.inputs.mask_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz') + + # Create registration workflow and connect its nodes + registration = Workflow( + base_dir = self.directories.working_dir, + name = "registration" + ) + + registration.connect([ + ( + info_source, + select_files, + [('subject_id', 'subject_id'), + ('run_id', 'run_id'), + ('contrast_id', 'contrast_id')] + ), + ( + select_files, + warpall_cope, + [('func2anat_transform', 'premat'), + ('anat2target_transform', 'field_file'), + ('cope', 'in_file')] + ), + ( + select_files, + warpall_varcope, + [('func2anat_transform', 'premat'), + ('anat2target_transform', 'field_file'), + ('varcope', 'in_file')] + ), + ( + warpall_cope, + data_sink, + [('out_file', 'registration.@reg_cope')] + ), + ( + warpall_varcope, + data_sink, + [('out_file', 'registration.@reg_varcope')] + ) + ]) + + return registration + + + def get_subject_level_analysis(self): + """ Return a Nipype worflow describing the subject level analysis part of the pipeline """ + + # [INFO] The following part stays the same for all pipelines + + # Infosource Node - To iterate on subjects + info_source = Node( + IdentityInterface( + fields = ['subject_id', 'contrast_id'], + ), + name='info_source', + ) + info_source.iterables = [('subject_id', self.subject_list), ('contrast_id', self.contrast_list)] + + # Templates to select files node + # [TODO] Change the name of the files depending on the filenames of results of preprocessing + templates = { + 'cope': join( + self.directories.output_dir, + 'registration', + '_contrast_id_{contrast_id}_run_id_*_subject_id_{subject_id}', + '_warpall_cope0', + 'cope{contrast_id}_warp.nii.gz', + ), + 'varcope': join( + self.directories.output_dir, + 'registration', + '_contrast_id_{contrast_id}_run_id_*_subject_id_{subject_id}', + '_warpall_varcope0', + 'varcope{contrast_id}_warp.nii.gz', + ) + } + + # SelectFiles node - to select necessary files + select_files = Node( + SelectFiles(templates, base_directory = self.directories.dataset_dir), + name = 'select_files' + ) + + # DataSink Node - store the wanted results in the wanted repository + data_sink = Node( + DataSink(base_directory = self.directories.output_dir), + name = 'data_sink' + ) + + # Generate design matrix + specify_model = Node(L2Model(num_copes = len(self.run_list)), name='l2model') + + # Merge copes and varcopes files for each subject + merge_copes = Node(Merge(dimension='t'), name='merge_copes') + + merge_varcopes = Node(Merge(dimension='t'), name='merge_varcopes') + + # Second level (single-subject, mean of all four scans) analyses: Fixed effects analysis. + flame = Node(FLAMEO(run_mode = 'fe', mask_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz')), + name='flameo') + + # [INFO] The following part defines the nipype workflow and the connections between nodes + + subject_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = 'subject_level_analysis' + ) + # [TODO] Add the connections the workflow needs + # [INFO] Input and output names can be found on NiPype documentation + subject_level_analysis.connect([ + ( + info_source, + select_files, + [('subject_id', 'subject_id'), + ('contrast_id', 'contrast_id')] + ), + ( + select_files, + merge_copes, + [('cope', 'in_files')] + ), + ( + select_files, + merge_varcopes, + [('varcope', 'in_files')] + ), + ( + merge_copes, + flame, + [('merged_file', 'cope_file')] + ), + ( + merge_varcopes, + flame, + [('merged_file', 'var_cope_file')] + ), + ( + specify_model, + flame, + [('design_mat', 'design_file'), + ('design_con', 't_con_file'), + ('design_grp', 'cov_split_file')] + ), + ( + flame, + data_sink, + [('zstats', 'subject_level_analysis.@stats'), + ('tstats', 'subject_level_analysis.@tstats'), + ('copes', 'subject_level_analysis.@copes'), + ('var_copes', 'subject_level_analysis.@varcopes')] + ), + ]) + + # [INFO] Here we simply return the created workflow + return subject_level_analysis + + # [INFO] This function returns the list of ids and files of each group of participants + # to do analyses for both groups, and one between the two groups. + def get_subgroups_contrasts( + copes, varcopes, subject_list: list, participants_file: str + ): + """ + This function return the file list containing only the files + belonging to subject in the wanted group. + + Parameters : + - copes: original file list selected by select_files node + - varcopes: original file list selected by select_files node + - subject_ids: list of subject IDs that are analyzed + - participants_file: file containing participants characteristics + + Returns : + - copes_equal_indifference : a subset of copes corresponding to subjects + in the equalIndifference group + - copes_equal_range : a subset of copes corresponding to subjects + in the equalRange group + - copes_global : a list of all copes + - varcopes_equal_indifference : a subset of varcopes corresponding to subjects + in the equalIndifference group + - varcopes_equal_range : a subset of varcopes corresponding to subjects + in the equalRange group + - equal_indifference_id : a list of subject ids in the equalIndifference group + - equal_range_id : a list of subject ids in the equalRange group + - varcopes_global : a list of all varcopes + """ + + equal_range_id = [] + equal_indifference_id = [] + + # Reading file containing participants IDs and groups + with open(participants_file, 'rt') as file: + next(file) # skip the header + + for line in file: + info = line.strip().split() - for i, filename in enumerate(repro_thresh): - f_in = filename - f_out = opj(result_dir, "NARPS-reproduction", f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_thresholded.nii.gz") - shutil.copyfile(f_in, f_out) + # Checking for each participant if its ID was selected + # and separate people depending on their group + if info[0][-3:] in subject_list and info[1] == 'equalIndifference': + equal_indifference_id.append(info[0][-3:]) + elif info[0][-3:] in subject_list and info[1] == 'equalRange': + equal_range_id.append(info[0][-3:]) + + copes_equal_indifference = [] + copes_equal_range = [] + copes_global = [] + varcopes_equal_indifference = [] + varcopes_equal_range = [] + varcopes_global = [] + + # Checking for each selected file if the corresponding participant was selected + # and add the file to the list corresponding to its group + for cope, varcope in zip(copes, varcopes): + sub_id = cope.split('/') + if sub_id[-2][-3:] in equal_indifference_id: + copes_equal_indifference.append(cope) + elif sub_id[-2][-3:] in equal_range_id: + copes_equal_range.append(cope) + if sub_id[-2][-3:] in subject_list: + copes_global.append(cope) + + sub_id = varcope.split('/') + if sub_id[-2][-3:] in equal_indifference_id: + varcopes_equal_indifference.append(varcope) + elif sub_id[-2][-3:] in equal_range_id: + varcopes_equal_range.append(varcope) + if sub_id[-2][-3:] in subject_list: + varcopes_global.append(varcope) + + return copes_equal_indifference, copes_equal_range, varcopes_equal_indifference, varcopes_equal_range,equal_indifference_id, equal_range_id,copes_global, varcopes_global + + + # [INFO] This function creates the dictionary of regressors used in FSL Nipype pipelines + def get_regressors( + equal_range_id: list, + equal_indifference_id: list, + method: str, + subject_list: list, + ) -> dict: + """ + Create dictionary of regressors for group analysis. + + Parameters: + - equal_range_id: ids of subjects in equal range group + - equal_indifference_id: ids of subjects in equal indifference group + - method: one of "equalRange", "equalIndifference" or "groupComp" + - subject_list: ids of subject for which to do the analysis + + Returns: + - regressors: regressors used to distinguish groups in FSL group analysis + """ + # For one sample t-test, creates a dictionary + # with a list of the size of the number of participants + if method == 'equalRange': + regressors = dict(group_mean = [1 for i in range(len(equal_range_id))]) + elif method == 'equalIndifference': + regressors = dict(group_mean = [1 for i in range(len(equal_indifference_id))]) + + # For two sample t-test, creates 2 lists: + # - one for equal range group, + # - one for equal indifference group + # Each list contains n_sub values with 0 and 1 depending on the group of the participant + # For equalRange_reg list --> participants with a 1 are in the equal range group + elif method == 'groupComp': + equalRange_reg = [ + 1 for i in range(len(equal_range_id) + len(equal_indifference_id)) + ] + equalIndifference_reg = [ + 0 for i in range(len(equal_range_id) + len(equal_indifference_id)) + ] + + for index, subject_id in enumerate(subject_list): + if subject_id in equal_indifference_id: + equalIndifference_reg[index] = 1 + equalRange_reg[index] = 0 + + regressors = dict( + equalRange = equalRange_reg, + equalIndifference = equalIndifference_reg + ) + + return regressors + + def get_group_level_analysis(self): + """ + Return all workflows for the group level analysis. + + Returns; + - a list of nipype.WorkFlow + """ + + methods = ['equalRange', 'equalIndifference', 'groupComp'] + return [self.get_group_level_analysis_sub_workflow(method) for method in methods] + + def get_group_level_analysis_sub_workflow(self, method): + """ + Return a workflow for the group level analysis. + + Parameters: + - method: one of 'equalRange', 'equalIndifference' or 'groupComp' + + Returns: + - group_level_analysis: nipype.WorkFlow + """ + # [INFO] The following part stays the same for all preprocessing pipelines + + # Infosource node - iterate over the list of contrasts generated + # by the subject level analysis + info_source = Node( + IdentityInterface( + fields = ['contrast_id', 'subjects'], + subjects = self.subject_list + ), + name = 'info_source', + ) + info_source.iterables = [('contrast_id', self.contrast_list)] + + # Templates to select files node + # [TODO] Change the name of the files depending on the filenames + # of results of first level analysis + template = { + 'cope' : join( + self.directories.results_dir, + 'subject_level_analysis', + '_contrast_id_{contrast_id}_subject_id_*', 'cope1.nii.gz'), + 'varcope' : join( + self.directories.results_dir, + 'subject_level_analysis', + '_contrast_id_{contrast_id}_subject_id_*', 'varcope1.nii.gz'), + 'participants' : join( + self.directories.dataset_dir, + 'participants.tsv') + } + select_files = Node( + SelectFiles( + templates, + base_directory = self.directories.results_dir, + force_list = True + ), + name = 'select_files', + ) + + # Datasink node - to save important files + data_sink = Node( + DataSink(base_directory = self.directories.output_dir), + name = 'data_sink', + ) + + contrasts = Node( + Function( + input_names=['copes', 'varcopes', 'subject_ids', 'participants_file'], + output_names=[ + 'copes_equalIndifference', + 'copes_equalRange', + 'varcopes_equalIndifference', + 'varcopes_equalRange', + 'equalIndifference_id', + 'equalRange_id', + 'copes_global', + 'varcopes_global' + ], + function = self.get_subgroups_contrasts, + ), + name = 'subgroups_contrasts', + ) + + regs = Node( + Function( + input_names = [ + 'equalRange_id', + 'equalIndifference_id', + 'method', + 'subject_list', + ], + output_names = ['regressors'], + function = self.get_regressors, + ), + name = 'regs', + ) + regs.inputs.method = method + regs.inputs.subject_list = self.subject_list + + # [INFO] The following part has to be modified with nodes of the pipeline + + # [TODO] For each node, replace 'node_name' by an explicit name, and use it for both: + # - the name of the variable in which you store the Node object + # - the 'name' attribute of the Node + # [TODO] The node_function refers to a NiPype interface that you must import + # at the begining of the file. + merge_copes = Node( + Merge(dimension = 't'), + name = 'merge_copes' + ) + + merge_varcopes = Node( + Merge(dimension = 't'), + name = 'merge_varcopes' + ) + + specify_model = Node( + MultipleRegressDesign(), + name = 'specify_model' + ) + + flame = Node( + FLAMEO( + run_mode = 'flame1', + mask_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz') + ), + name='flame' + ) + + cluster = MapNode( + Cluster( + threshold = 3.1, + out_threshold_file = True + ), + name = 'cluster', + iterfield = ['in_file', 'cope_file'], + synchronize = True + ) + + # [INFO] The following part defines the nipype workflow and the connections between nodes + + # Compute the number of participants used to do the analysis + nb_subjects = len(self.subject_list) + + # Declare the workflow + group_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}' + ) + group_level_analysis.connect( + [ + ( + info_source, + select_files, + [('contrast_id', 'contrast_id')], + ), + ( + info_source, + subgroups_contrasts, + [('subject_list', 'subject_ids')], + ), + ( + select_files, + subgroups_contrasts, + [ + ('cope', 'copes'), + ('varcope', 'varcopes'), + ('participants', 'participants_file'), + ], + ), + ( + subgroups_contrasts, + regs, + [ + ('equalRange_id', 'equalRange_id'), + ('equalIndifference_id', 'equalIndifference_id') + ] + ), + ( + regs, + specify_model, + [('regressors', 'regressors')] + ) + ] + ) # Complete with other links between nodes + + - print(f"Results files of team {team_ID} reorganized.") + # [INFO] Here whe define the contrasts used for the group level analysis, depending on the + # method used. + if method in ('equalRange', 'equalIndifference'): + contrasts = [('Group', 'T', ['mean'], [1]), ('Group', 'T', ['mean'], [-1])] + + if method == 'equalIndifference': + group_level_analysis.connect([ + ( + subgroups_contrasts, + merge_copes, + [('copes_equalIndifference', 'in_files')] + ), + ( + subgroups_contrasts, + merge_varcopes, + [('varcopes_equalIndifference', 'in_files')] + ) + ]) + + elif method == 'equalRange': + group_level_analysis.connect([ + ( + subgroups_contrasts, + merge_copes_3rdlevel, + [('copes_equalRange', 'in_files')] + ), + ( + subgroups_contrasts, + merge_varcopes_3rdlevel, + [('varcopes_equalRange', 'in_files')] + ) + ]) + + elif method == 'groupComp': + contrasts = [ + ('Eq range vs Eq indiff in loss', 'T', ['Group_{1}', 'Group_{2}'], [1, -1]) + ] + + group_level_analysis.connect([ + ( + select_files, + merge_copes, + [('cope', 'in_files')] + ), + ( + select_files, + merge_varcopes, + [('varcope', 'in_files')] + ) + ]) + + group_level_analysis.connect([ + ( + merge_copes, + flame, + [('merged_file', 'cope_file')] + ), + ( + merge_varcopes, + flame, + [('merged_file', 'var_cope_file')] + ), + ( + specify_model, + flame, + [ + ('design_mat', 'design_file'), + ('design_con', 't_con_file'), + ('design_grp', 'cov_split_file') + ] + ), + ( + flame, + cluster, + [ + ('zstats', 'in_file'), + ('copes', 'cope_file') + ] + ), + ( + flame, + data_sink, + [ + ('zstats', f"group_level_analysis_{method}_nsub_{nb_subjects}.@zstats"), + ('tstats', f"group_level_analysis_{method}_nsub_{nb_subjects}.@tstats") + ] + ), + ( + cluster, + data_sink, + [('threshold_file', f"group_level_analysis_{method}_nsub_{nb_subjects}.@thresh")] + ) + ]) + + # [INFO] Here we simply return the created workflow + return group_level_analysis From 6e74fbd1c665b578a51c6c5d216f81791d6ad96c Mon Sep 17 00:00:00 2001 From: elodiegermani1 Date: Thu, 20 Jul 2023 17:17:29 -0400 Subject: [PATCH 02/11] [1KB2] Reproduction test file --- tests/pipelines/test_team_1KB2.py | 90 +++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 tests/pipelines/test_team_1KB2.py diff --git a/tests/pipelines/test_team_1KB2.py b/tests/pipelines/test_team_1KB2.py new file mode 100644 index 00000000..7e4f39e3 --- /dev/null +++ b/tests/pipelines/test_team_1KB2.py @@ -0,0 +1,90 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Tests of the 'narps_open.pipelines.team_2T6S' module. + +Launch this test with PyTest + +Usage: +====== + pytest -q test_team_2T6S.py + pytest -q test_team_2T6S.py -k +""" + +from statistics import mean + +from pytest import raises, helpers, mark +from nipype import Workflow + +from narps_open.pipelines.team_1KB2_debug import PipelineTeam1KB2 + +class TestPipelinesTeam2T6S: + """ A class that contains all the unit tests for the PipelineTeam2T6S class.""" + + @staticmethod + @mark.unit_test + def test_create(): + """ Test the creation of a PipelineTeam1KB2 object """ + + pipeline = PipelineTeam1KB2() + + # 1 - check the parameters + assert pipeline.team_id == '1KB2' + + # 2 - check workflows + assert isinstance(pipeline.get_preprocessing(), Workflow) + assert isinstance(pipeline.get_run_level_analysis(), Workflow) + assert isinstance(pipeline.registration(), Workflow) + assert isinstance(pipeline.get_subject_level_analysis(), Workflow) + group_level = pipeline.get_group_level_analysis() + + assert len(group_level) == 3 + for sub_workflow in group_level: + assert isinstance(sub_workflow, Workflow) + + @staticmethod + @mark.unit_test + def test_outputs(): + """ Test the expected outputs of a PipelineTeam2T6S object """ + pipeline = PipelineTeam1KB2() + # 1 - 1 suject outputs, 1 run + pipeline.subject_list = ['001'] + pipeline.run_list = ['01'] + pipeline.contrast_list = ['1','2'] + assert len(pipeline.get_preprocessing_outputs()) == 6 + #assert len(pipeline.get_run_level_outputs()) == 0 + #assert len(pipeline.get_registration()) == 0 + #assert len(pipeline.get_subject_level_outputs()) == + #assert len(pipeline.get_group_level_outputs()) == 84 + + # 2 - 1 suject outputs, 4 runs + pipeline.subject_list = ['001'] + pipeline.run_list = ['01', '02', '03', '04'] + pipeline.contrast_list = ['1','2'] + assert len(pipeline.get_preprocessing_outputs()) == 24 + #assert len(pipeline.get_run_level_outputs()) == 0 + #assert len(pipeline.get_registration()) == 0 + #assert len(pipeline.get_subject_level_outputs()) == 36 + #assert len(pipeline.get_group_level_outputs()) == 84 + + # 3 - 4 suject outputs, 4 runs + pipeline.subject_list = ['001', '002', '003', '004'] + pipeline.run_list = ['01', '02', '03', '04'] + pipeline.contrast_list = ['1','2'] + assert len(pipeline.get_preprocessing_outputs()) == 96 + #assert len(pipeline.get_run_level_outputs()) == 0 + #assert len(pipeline.get_registration()) == 0 + #assert len(pipeline.get_subject_level_outputs()) == 36 + #assert len(pipeline.get_group_level_outputs()) == 84 + + @staticmethod + @mark.pipeline_test + def test_execution(): + """ Test the execution of a PipelineTeam1KB2 and compare results """ + results_4_subjects = helpers.test_pipeline( + '1KB2', + '/references/', + '/data/', + '/output/', + 4) + assert mean(results_4_subjects) > .003 From df88329676f1cb3106fcb4e697e9e610065a3f6f Mon Sep 17 00:00:00 2001 From: elodiegermani1 Date: Thu, 20 Jul 2023 17:44:06 -0400 Subject: [PATCH 03/11] [1KB2] Implement tests to check number of outputs for preprocessing --- .../{team_1KB2_debug.py => team_1KB2.py} | 45 +++++++++++++++++++ tests/pipelines/test_team_1KB2.py | 2 +- 2 files changed, 46 insertions(+), 1 deletion(-) rename narps_open/pipelines/{team_1KB2_debug.py => team_1KB2.py} (95%) diff --git a/narps_open/pipelines/team_1KB2_debug.py b/narps_open/pipelines/team_1KB2.py similarity index 95% rename from narps_open/pipelines/team_1KB2_debug.py rename to narps_open/pipelines/team_1KB2.py index 257da340..fb04da33 100755 --- a/narps_open/pipelines/team_1KB2_debug.py +++ b/narps_open/pipelines/team_1KB2.py @@ -162,6 +162,51 @@ def get_preprocessing(self): # [INFO] Here we simply return the created workflow return preprocessing + def get_preprocessing_outputs(self): + """ Return the names of the files the preprocessing is supposed to generate. """ + + templates = [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', '_addmean0', + 'sub-{subject_id}_'+f'task-MGT_run-{run_id}_bold_dtype_mcf_mask_smooth_mask_gms_tempfilt_maths.nii.gz')\ + for run_id in self.run_list] + + templates += [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', '_dilatemask0', + 'sub-{subject_id}_'+f'task-MGT_run-{run_id}_bold_dtype_mcf_bet_thresh_dil.nii.gz')\ + for run_id in self.run_list] + + templates += [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', '_maskfunc30', + 'sub-{subject_id}_'+f'task-MGT_run-{run_id}_bold_dtype_mcf_mask_smooth_mask.nii.gz')\ + for run_id in self.run_list] + + templates += [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', '_realign0', + 'sub-{subject_id}_'+f'task-MGT_run-{run_id}_bold_dtype_mcf.nii.gz.par')\ + for run_id in self.run_list] + + templates += [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', + 'sub-{subject_id}_'+f'T1w_fieldwarp.nii.gz')] + + templates += [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', + 'sub-{subject_id}_'+f'task-MGT_run-{run_id}_bold_dtype_mean_flirt.mat')\ + for run_id in self.run_list] + + # Format with subject_ids + return_list = [] + for template in templates: + return_list += [template.format(subject_id = s) for s in self.subject_list] + + return return_list + # [INFO] This function is used in the subject level analysis pipelines using FSL def get_subject_infos(event_file: str): """ diff --git a/tests/pipelines/test_team_1KB2.py b/tests/pipelines/test_team_1KB2.py index 7e4f39e3..4d7adc98 100644 --- a/tests/pipelines/test_team_1KB2.py +++ b/tests/pipelines/test_team_1KB2.py @@ -18,7 +18,7 @@ from narps_open.pipelines.team_1KB2_debug import PipelineTeam1KB2 -class TestPipelinesTeam2T6S: +class TestPipelinesTeam1KB2: """ A class that contains all the unit tests for the PipelineTeam2T6S class.""" @staticmethod From 7b3e98ef3c87e2d1e49f6563b0235257695e9334 Mon Sep 17 00:00:00 2001 From: elodiegermani1 Date: Thu, 20 Jul 2023 17:45:39 -0400 Subject: [PATCH 04/11] [1KB2] Implement tests to check number of outputs for preprocessing --- tests/pipelines/test_team_1KB2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pipelines/test_team_1KB2.py b/tests/pipelines/test_team_1KB2.py index 4d7adc98..6fdcc459 100644 --- a/tests/pipelines/test_team_1KB2.py +++ b/tests/pipelines/test_team_1KB2.py @@ -16,7 +16,7 @@ from pytest import raises, helpers, mark from nipype import Workflow -from narps_open.pipelines.team_1KB2_debug import PipelineTeam1KB2 +from narps_open.pipelines.team_1KB2 import PipelineTeam1KB2 class TestPipelinesTeam1KB2: """ A class that contains all the unit tests for the PipelineTeam2T6S class.""" From 9a1d6d93718a3ab9105133233846f4c65a901671 Mon Sep 17 00:00:00 2001 From: elodiegermani1 Date: Thu, 20 Jul 2023 17:48:40 -0400 Subject: [PATCH 05/11] [1KB2] Implement tests to check number of outputs for preprocessing --- tests/pipelines/test_team_1KB2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pipelines/test_team_1KB2.py b/tests/pipelines/test_team_1KB2.py index 6fdcc459..e84b6247 100644 --- a/tests/pipelines/test_team_1KB2.py +++ b/tests/pipelines/test_team_1KB2.py @@ -1,7 +1,7 @@ #!/usr/bin/python # coding: utf-8 -""" Tests of the 'narps_open.pipelines.team_2T6S' module. +""" Tests of the 'narps_open.pipelines.team_1KB2' module. Launch this test with PyTest From ced9205f9567ae153603bd684da2bda3d1d63a7e Mon Sep 17 00:00:00 2001 From: elodiegermani1 Date: Fri, 21 Jul 2023 09:29:08 -0400 Subject: [PATCH 06/11] [1KB2] Requirements for testing. --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index dc87a529..4c992601 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,8 @@ 'tomli>=2.0.1,<2.1', 'networkx>=2.0,<3.0', # a workaround to nipype's bug (issue 3530) 'nipype', - 'pandas' + 'pandas', + 'niflow-nipype1-workflows' ] extras_require = { 'tests': [ From 004859c96b9f5998937197142bd1828fc574e2f0 Mon Sep 17 00:00:00 2001 From: elodiegermani1 Date: Fri, 21 Jul 2023 09:41:54 -0400 Subject: [PATCH 07/11] [1KB2] Correction for testing. --- narps_open/pipelines/team_1KB2.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/narps_open/pipelines/team_1KB2.py b/narps_open/pipelines/team_1KB2.py index fb04da33..f9f2bdc0 100755 --- a/narps_open/pipelines/team_1KB2.py +++ b/narps_open/pipelines/team_1KB2.py @@ -37,10 +37,9 @@ class PipelineTeam1KB2(Pipeline): """ A class that defines the pipeline of team 1KB2 """ def __init__(self): - # [INFO] Remove the init method completely if unused - # [TODO] Init the attributes of the pipeline, if any other than the ones defined - # in the pipeline class - pass + super().__init__() + self.team_id = '1KB2' + self.contrast_list = ['0001', '0002', '0003', '0004'] def get_preprocessing(self): """ Return a Nipype worflow describing the prerpocessing part of the pipeline """ From f554a1860cbb2962ebf7e16175cca2b4d1be2d12 Mon Sep 17 00:00:00 2001 From: elodiegermani1 Date: Fri, 21 Jul 2023 10:45:51 -0400 Subject: [PATCH 08/11] [1KB2] Correction test: add of fake path to FSL. --- narps_open/pipelines/team_1KB2.py | 14 ++------------ tests/pipelines/test_team_1KB2.py | 7 +++++-- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/narps_open/pipelines/team_1KB2.py b/narps_open/pipelines/team_1KB2.py index f9f2bdc0..e4e190b5 100755 --- a/narps_open/pipelines/team_1KB2.py +++ b/narps_open/pipelines/team_1KB2.py @@ -1,17 +1,6 @@ #!/usr/bin/python # coding: utf-8 -""" -This template can be use to reproduce a pipeline using FSL as main software. - -- Replace all occurences of 48CD by the actual id of the team. -- All lines beging [INFO], are meant to help you during the reproduction, these can be removed -eventually. -- Also remove lines beging with [TODO], once you did what they suggested. -""" - -# [TODO] Only import modules you use further in te code, remove others from the import section - from os.path import join # [INFO] The import of base objects from Nipype, to create Workflows @@ -191,7 +180,8 @@ def get_preprocessing_outputs(self): templates += [join( self.directories.output_dir, 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', - 'sub-{subject_id}_'+f'T1w_fieldwarp.nii.gz')] + 'sub-{subject_id}_'+f'T1w_fieldwarp.nii.gz')\ + for run_id in self.run_list] templates += [join( self.directories.output_dir, diff --git a/tests/pipelines/test_team_1KB2.py b/tests/pipelines/test_team_1KB2.py index e84b6247..080d0b1f 100644 --- a/tests/pipelines/test_team_1KB2.py +++ b/tests/pipelines/test_team_1KB2.py @@ -12,6 +12,7 @@ """ from statistics import mean +from os import environ from pytest import raises, helpers, mark from nipype import Workflow @@ -19,12 +20,14 @@ from narps_open.pipelines.team_1KB2 import PipelineTeam1KB2 class TestPipelinesTeam1KB2: - """ A class that contains all the unit tests for the PipelineTeam2T6S class.""" + """ A class that contains all the unit tests for the PipelineTeam1KB2 class.""" @staticmethod @mark.unit_test def test_create(): """ Test the creation of a PipelineTeam1KB2 object """ + # Defines fake environment variable + patch.dict(environ, {'FSLDIR': '/fake/path/to/fsl'}) pipeline = PipelineTeam1KB2() @@ -45,7 +48,7 @@ def test_create(): @staticmethod @mark.unit_test def test_outputs(): - """ Test the expected outputs of a PipelineTeam2T6S object """ + """ Test the expected outputs of a PipelineTeam1KB2 object """ pipeline = PipelineTeam1KB2() # 1 - 1 suject outputs, 1 run pipeline.subject_list = ['001'] From 736d2ffa9f6f5b832187e1d8d32ec1a0a4c543ac Mon Sep 17 00:00:00 2001 From: elodiegermani1 Date: Fri, 21 Jul 2023 10:49:12 -0400 Subject: [PATCH 09/11] [1KB2] Correction test: add of fake path to FSL. --- tests/pipelines/test_team_1KB2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pipelines/test_team_1KB2.py b/tests/pipelines/test_team_1KB2.py index 080d0b1f..f343777a 100644 --- a/tests/pipelines/test_team_1KB2.py +++ b/tests/pipelines/test_team_1KB2.py @@ -24,10 +24,10 @@ class TestPipelinesTeam1KB2: @staticmethod @mark.unit_test - def test_create(): + def test_create(mocker): # Add mocker as argument """ Test the creation of a PipelineTeam1KB2 object """ # Defines fake environment variable - patch.dict(environ, {'FSLDIR': '/fake/path/to/fsl'}) + mocker.patch.dict(environ, {'FSLDIR': '/fake/path/to/fsl'}) pipeline = PipelineTeam1KB2() From 299b5262bf7d9b6ce01fd76592cfed50827c1bb5 Mon Sep 17 00:00:00 2001 From: elodiegermani1 Date: Tue, 15 Aug 2023 10:07:48 -0400 Subject: [PATCH 10/11] Start reproduction of pipeline 43FJ --- .../{team_43FJ_debug.py => team_43FJ.py} | 0 tests/pipelines/test_team_43FJ.py | 93 +++++++++++++++++++ 2 files changed, 93 insertions(+) rename narps_open/pipelines/{team_43FJ_debug.py => team_43FJ.py} (100%) create mode 100644 tests/pipelines/test_team_43FJ.py diff --git a/narps_open/pipelines/team_43FJ_debug.py b/narps_open/pipelines/team_43FJ.py similarity index 100% rename from narps_open/pipelines/team_43FJ_debug.py rename to narps_open/pipelines/team_43FJ.py diff --git a/tests/pipelines/test_team_43FJ.py b/tests/pipelines/test_team_43FJ.py new file mode 100644 index 00000000..f343777a --- /dev/null +++ b/tests/pipelines/test_team_43FJ.py @@ -0,0 +1,93 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Tests of the 'narps_open.pipelines.team_1KB2' module. + +Launch this test with PyTest + +Usage: +====== + pytest -q test_team_2T6S.py + pytest -q test_team_2T6S.py -k +""" + +from statistics import mean +from os import environ + +from pytest import raises, helpers, mark +from nipype import Workflow + +from narps_open.pipelines.team_1KB2 import PipelineTeam1KB2 + +class TestPipelinesTeam1KB2: + """ A class that contains all the unit tests for the PipelineTeam1KB2 class.""" + + @staticmethod + @mark.unit_test + def test_create(mocker): # Add mocker as argument + """ Test the creation of a PipelineTeam1KB2 object """ + # Defines fake environment variable + mocker.patch.dict(environ, {'FSLDIR': '/fake/path/to/fsl'}) + + pipeline = PipelineTeam1KB2() + + # 1 - check the parameters + assert pipeline.team_id == '1KB2' + + # 2 - check workflows + assert isinstance(pipeline.get_preprocessing(), Workflow) + assert isinstance(pipeline.get_run_level_analysis(), Workflow) + assert isinstance(pipeline.registration(), Workflow) + assert isinstance(pipeline.get_subject_level_analysis(), Workflow) + group_level = pipeline.get_group_level_analysis() + + assert len(group_level) == 3 + for sub_workflow in group_level: + assert isinstance(sub_workflow, Workflow) + + @staticmethod + @mark.unit_test + def test_outputs(): + """ Test the expected outputs of a PipelineTeam1KB2 object """ + pipeline = PipelineTeam1KB2() + # 1 - 1 suject outputs, 1 run + pipeline.subject_list = ['001'] + pipeline.run_list = ['01'] + pipeline.contrast_list = ['1','2'] + assert len(pipeline.get_preprocessing_outputs()) == 6 + #assert len(pipeline.get_run_level_outputs()) == 0 + #assert len(pipeline.get_registration()) == 0 + #assert len(pipeline.get_subject_level_outputs()) == + #assert len(pipeline.get_group_level_outputs()) == 84 + + # 2 - 1 suject outputs, 4 runs + pipeline.subject_list = ['001'] + pipeline.run_list = ['01', '02', '03', '04'] + pipeline.contrast_list = ['1','2'] + assert len(pipeline.get_preprocessing_outputs()) == 24 + #assert len(pipeline.get_run_level_outputs()) == 0 + #assert len(pipeline.get_registration()) == 0 + #assert len(pipeline.get_subject_level_outputs()) == 36 + #assert len(pipeline.get_group_level_outputs()) == 84 + + # 3 - 4 suject outputs, 4 runs + pipeline.subject_list = ['001', '002', '003', '004'] + pipeline.run_list = ['01', '02', '03', '04'] + pipeline.contrast_list = ['1','2'] + assert len(pipeline.get_preprocessing_outputs()) == 96 + #assert len(pipeline.get_run_level_outputs()) == 0 + #assert len(pipeline.get_registration()) == 0 + #assert len(pipeline.get_subject_level_outputs()) == 36 + #assert len(pipeline.get_group_level_outputs()) == 84 + + @staticmethod + @mark.pipeline_test + def test_execution(): + """ Test the execution of a PipelineTeam1KB2 and compare results """ + results_4_subjects = helpers.test_pipeline( + '1KB2', + '/references/', + '/data/', + '/output/', + 4) + assert mean(results_4_subjects) > .003 From 208f42ac37a5c669cf2a29218e22c94701934097 Mon Sep 17 00:00:00 2001 From: elodiegermani1 Date: Tue, 15 Aug 2023 15:00:50 -0400 Subject: [PATCH 11/11] Preprocessing pipeline - to test --- narps_open/pipelines/team_43FJ.py | 1249 ++++++++++++++++++++++++++--- 1 file changed, 1123 insertions(+), 126 deletions(-) diff --git a/narps_open/pipelines/team_43FJ.py b/narps_open/pipelines/team_43FJ.py index 08be5ec9..7a5ebf9c 100755 --- a/narps_open/pipelines/team_43FJ.py +++ b/narps_open/pipelines/team_43FJ.py @@ -1,139 +1,1136 @@ -from nipype.interfaces.fsl import (BET, FAST, MCFLIRT, FLIRT, FNIRT, ApplyWarp, SUSAN, MotionOutliers, - Info, ImageMaths, IsotropicSmooth, Threshold, Level1Design, FEATModel, - L2Model, Merge, FLAMEO, ContrastMgr, FILMGLS, Randomise, MultipleRegressDesign) -from nipype.algorithms.modelgen import SpecifyModel +#!/usr/bin/python +# coding: utf-8 + +from os.path import join + +# [INFO] The import of base objects from Nipype, to create Workflows +from nipype import Node, Workflow, MapNode + +# [INFO] a list of interfaces used to manpulate data from nipype.interfaces.utility import IdentityInterface, Function from nipype.interfaces.io import SelectFiles, DataSink -from nipype.algorithms.misc import Gunzip -from nipype import Workflow, Node, MapNode -from nipype.interfaces.base import Bunch - -from os.path import join as opj -import os - -""" -Preprocessing was conducted using FEAT (FMRI Expert Analysis Tool) v 6.00, part of FSL version 5.0.10 (FMRIB Software Library, www.fmrib.ox.ac.uk/fsl). -- Preprocessing consisted of nonbrain removal using BET (Brain Extraction Tool for FSL), -- high-pass filtering (100-s cutoff), -- and spatial smoothing using a Gaussian kernel of FWHM 5 mm. -- Motion correction was performed with MCFLIRT (intra-modal motion correction tool based on optimization and registration techniques in FSL’s registration tool FLIRT) using 24 standard and extended regressors (six motion parameters, the derivatives of those parameters, and the squares of the derivatives and the original parameters). -Additional spike regressors created using fsl_motion_outliers (frame displacement threshold=75th percentile plus 1.5 times the interquartile range) were also included. -- Each participant’s functional data were registered to their T1 weighted anatomical image using boundary based registration (BBR; Greve & Fischl, 2009) and then to MNI (Montreal Neurological Institute) stereotaxic space with 12 degrees of freedom via FLIRT (FMRIB’s Linear Image Registration Tool). -Alignment was visually confirmed for all participants. FILM (FMRIB’s Improved Linear Model) prewhitening was performed to estimate voxelwise autocorrelation and improve estimation efficiency. -""" - -def get_preprocessing(exp_dir, result_dir, working_dir, output_dir, subject_list, run_list, fwhm): - """ - Returns the preprocessing workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the preprocessing - - run_list: list of str, list of runs for which you want to do the preprocessing - - fwhm: float, fwhm for smoothing step - - Returns: - - preprocessing: Nipype WorkFlow - """ - infosource_preproc = Node(IdentityInterface(fields = ['subject_id', 'run_id']), - name = 'infosource_preproc') - - infosource_preproc.iterables = [('subject_id', subject_list), ('run_id', run_list)] - - # Templates to select files node - anat_file = opj('sub-{subject_id}', 'anat', - 'sub-{subject_id}_T1w.nii.gz') - - func_file = opj('sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz') - - template = {'anat' : anat_file, 'func' : func_file} - - # SelectFiles node - to select necessary files - selectfiles_preproc = Node(SelectFiles(template, base_directory=exp_dir), name = 'selectfiles_preproc') - - """ - Nonbrain removal was performed using BET (Brain Extraction Tool for FSL). - Default fractional intensity threshold of .5 and vertical gradient of 0 were used. - """ +# from nipype.algorithms.misc import Gunzip - skullstrip = Node(BET(frac = 0.5, robust = True, vertical_gradient = 0), name = 'skullstrip') +# [INFO] a list of FSL-specific interfaces +from nipype.interfaces.fsl import (BET, FAST, MCFLIRT, FLIRT, FNIRT, ApplyWarp, SUSAN, + Info, ImageMaths, IsotropicSmooth, Threshold, Level1Design, FEATModel, + L2Model, Merge, FLAMEO, ContrastMgr,Cluster, FILMGLS, Randomise, + MultipleRegressDesign, ImageStats, ExtractROI, PlotMotionParams, MeanImage, + MotionOutliers) +from nipype.algorithms.modelgen import SpecifyModel +from niflow.nipype1.workflows.fmri.fsl import create_reg_workflow, create_featreg_preproc - fast = Node(FAST(), name='fast') - #register.connect(stripper, 'out_file', fast, 'in_files') +# [INFO] In order to inherit from Pipeline +from narps_open.pipelines import Pipeline - binarize = Node(ImageMaths(op_string='-nan -thr 0.5 -bin'), name='binarize') - pickindex = lambda x, i: x[i] - #register.connect(fast, ('partial_volume_files', pickindex, 2), binarize, - # 'in_file') - - """ - Motion correction was performed with MCFLIRT (intra-modal motion correction tool based on optimization and registration techniques in FSL’s registration tool FLIRT) using 24 standard and extended regressors (six motion parameters, the derivatives of those parameters, and the squares of the derivatives and the original parameters). - Additional spike regressors created using fsl_motion_outliers (frame displacement threshold=75th percentile plus 1.5 times the interquartile range) were also included. - Middle volume default was used as the reference image. - All optimizations use trilinear interpolation. B0 and fieldmap unwarping were not used. - """ - - motion_correction = Node(MCFLIRT(mean_vol = True, save_rms = True, save_plots=True, save_mats=True), name = 'motion_correction') - - motion_outliers = Node(MotionOutliers(), name = 'motion_outliers') - - """ - Each participant’s functional data were registered via linear interpolation to their T1 weighted anatomical image using boundary based registration (BBR; Greve & Fischl, 2009) and then to MNI (Montreal Neurological Institute) stereotaxic space with 12 degrees of freedom via FLIRT (FMRIB’s Linear Image Registration Tool). - """ +class PipelineTeam43FJ(Pipeline): + """ A class that defines the pipeline of team 43FJ """ + + def __init__(self): + super().__init__() + self.team_id = '43FJ' + self.contrast_list = ['0001', '0002', '0003', '0004'] + + def get_preprocessing(self): + """ Return a Nipype worflow describing the prerpocessing part of the pipeline """ - mean2anat = Node(FLIRT(dof = 12), name = 'mean2anat') + # [INFO] The following part stays the same for all preprocessing pipelines - mean2anat_bbr = Node(FLIRT(dof = 12, cost = 'bbr', schedule = opj(os.getenv('FSLDIR'), 'etc/flirtsch/bbr.sch')), - name = 'mean2anat_bbr') + # IdentityInterface node - allows to iterate over subjects and runs + info_source = Node( + IdentityInterface(fields=['subject_id', 'run_id']), + name='info_source' + ) + info_source.iterables = [ + ('subject_id', self.subject_list), + ('run_id', self.run_list), + ] - anat2mni_linear = Node(FLIRT(dof = 12, reference = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz')), - name = 'anat2mni_linear') + # Templates to select files node + file_templates = { + 'anat': join( + 'sub-{subject_id}', 'anat', 'sub-{subject_id}_T1w.nii.gz' + ), + 'func': join( + 'sub-{subject_id}', 'func', 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz' + ) + } - anat2mni_nonlinear = Node(FNIRT(fieldcoeff_file = True, ref_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz')), - name = 'anat2mni_nonlinear') + # SelectFiles node - to select necessary files + select_files = Node( + SelectFiles(file_templates, base_directory = self.directories.dataset_dir), + name='select_files' + ) - warp_all = Node(ApplyWarp(interp='spline', ref_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz')), - name='warp_all') + # DataSink Node - store the wanted results in the wanted repository + data_sink = Node( + DataSink(base_directory = self.directories.output_dir), + name='data_sink', + ) + + img2float = Node( + ImageMaths(out_data_type='float', op_string='', suffix='_dtype'), + name='img2float', + ) - """ - Feat FWHM 5mm applied to each volume of functional data. - """ - - smooth = Node(SUSAN(brightness_threshold = 2000, fwhm = 5), name = 'smooth') - - datasink = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink') - - preprocessing = Workflow(base_dir = opj(result_dir, working_dir), name = "preprocessing") - - preprocessing.connect([(infosource_preproc, selectfiles_preproc, [('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (selectfiles_preproc, skullstrip, [('anat', 'in_file')]), - (selectfiles_preproc, motion_correction, [('func', 'in_file')]), - (selectfiles_preproc, motion_outliers, [('func', 'in_file')]), - (skullstrip, fast, [('out_file', 'in_files')]), - (fast, binarize, [(('partial_volume_files', pickindex, 2), 'in_file')]), - (motion_correction, mean2anat, [('mean_img', 'in_file')]), - (skullstrip, mean2anat, [('out_file', 'reference')]), - (motion_correction, mean2anat_bbr, [('mean_img', 'in_file')]), - (binarize, mean2anat_bbr, [('out_file', 'wm_seg')]), - (selectfiles_preproc, mean2anat_bbr, [('anat', 'reference')]), - (mean2anat, mean2anat_bbr, [('out_matrix_file', 'in_matrix_file')]), - (skullstrip, anat2mni_linear, [('out_file', 'in_file')]), - (anat2mni_linear, anat2mni_nonlinear, [('out_matrix_file', 'affine_file')]), - (selectfiles_preproc, anat2mni_nonlinear, [('anat', 'in_file')]), - (motion_correction, warp_all, [('out_file', 'in_file')]), - (mean2anat_bbr, warp_all, [('out_matrix_file', 'premat')]), - (anat2mni_nonlinear, warp_all, [('fieldcoeff_file', 'field_file')]), - (warp_all, smooth, [('out_file', 'in_file')]), - (smooth, datasink, [('smoothed_file', 'preprocess.@smoothed_file')]), - (motion_correction, datasink, [('par_file', 'preprocess.@parameters_file')]), - (motion_outliers, datasink, [('out_metric_values', 'preprocess.@outliers'), ('out_metric_plot', 'preprocess.@outliers_plot')]), - ]) - - return preprocessing + extract_mean = Node( + MeanImage(), + name = 'extract_mean', + ) + + reg = create_reg_workflow() + reg.inputs.inputspec.target_image = Info.standard_image('MNI152_T1_2mm_brain.nii.gz') + reg.inputs.inputspec.target_image_brain = Info.standard_image('MNI152_T1_2mm_brain.nii.gz') + reg.inputs.mean2anat.dof = 12 + reg.inputs.mean2anatbbr.dof = 12 + reg.inputs.anat2target_linear.dof = 12 + + outliers = Node(MotionOutliers(metric='fd'), name='outliers') + + + mc_smooth = create_featreg_preproc( + name='featpreproc', + highpass=True, + whichvol='middle', + whichrun=0) + + mc_smooth.inputs.inputspec.fwhm = 5 + mc_smooth.inputs.inputspec.highpass = 100 + + # [INFO] The following part defines the nipype workflow and the connections between nodes + + preprocessing = Workflow( + base_dir = self.directories.working_dir, + name = 'preprocessing' + ) + + preprocessing.connect( + [ + ( + info_source, + select_files, + [('subject_id', 'subject_id'), + ('run_id', 'run_id')], + ), + ( + select_files, + img2float, + [('func', 'in_file')], + ), + ( + img2float, + extract_mean, + [('out_file', 'in_file')], + ), + ( + extract_mean, + reg, + [('out_file', 'inputspec.mean_image')], + ), + ( + select_files, + reg, + [('func', 'inputspec.source_files'), ('anat', 'inputspec.anatomical_image')], + ), + ( + select_files, + mc_smooth, + [('func', 'inputspec.func')], + ), + ( + mc_smooth, + outliers, + [('img2float.out_file', 'in_file')] + ), + + ( + reg, + data_sink, + [('outputspec.anat2target_transform', 'preprocess.@transfo_all'), + ('outputspec.func2anat_transform', 'preprocess.@transfo_init')], + ), + ( + mc_smooth, + data_sink, + [('outputspec.motion_parameters', 'preprocess.@parameters_file'), + ('outputspec.highpassed_files', 'preprocess.@hp'), + ('outputspec.smoothed_files', 'preprocess.@smooth'), + ('outputspec.mask', 'preprocess.@mask')], + ), + ( + outliers, + data_sink, + [('out_file', 'preprocess.@outliers')] + ) + ] + ) + + # [INFO] Here we simply return the created workflow + return preprocessing + + def get_preprocessing_outputs(self): + """ Return the names of the files the preprocessing is supposed to generate. """ + + templates = [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', '_addmean0', + 'sub-{subject_id}_'+f'task-MGT_run-{run_id}_bold_dtype_mcf_mask_smooth_mask_gms_tempfilt_maths.nii.gz')\ + for run_id in self.run_list] + + templates += [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', '_dilatemask0', + 'sub-{subject_id}_'+f'task-MGT_run-{run_id}_bold_dtype_mcf_bet_thresh_dil.nii.gz')\ + for run_id in self.run_list] + + templates += [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', '_maskfunc30', + 'sub-{subject_id}_'+f'task-MGT_run-{run_id}_bold_dtype_mcf_mask_smooth_mask.nii.gz')\ + for run_id in self.run_list] + + templates += [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', '_realign0', + 'sub-{subject_id}_'+f'task-MGT_run-{run_id}_bold_dtype_mcf.nii.gz.par')\ + for run_id in self.run_list] + + templates += [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', + 'sub-{subject_id}_'+f'T1w_fieldwarp.nii.gz')\ + for run_id in self.run_list] + + templates += [join( + self.directories.output_dir, + 'l1_analysis', f'run_id_{run_id}'+'_subject_id_{subject_id}', + 'sub-{subject_id}_'+f'task-MGT_run-{run_id}_bold_dtype_mean_flirt.mat')\ + for run_id in self.run_list] + + # Format with subject_ids + return_list = [] + for template in templates: + return_list += [template.format(subject_id = s) for s in self.subject_list] + + return return_list + + # [INFO] This function is used in the subject level analysis pipelines using FSL + def get_subject_infos(event_file: str): + """ + Create Bunchs for specifyModel. + + Parameters : + - event_file : file corresponding to the run and the subject to analyze + + Returns : + - subject_info : list of Bunch for 1st level analysis. + """ + + from os.path import join as opj + from nipype.interfaces.base import Bunch + + cond_names = ['trial', 'gain', 'loss'] + + onset = {} + duration = {} + amplitude = {} + + for c in cond_names: # For each condition. + onset.update({c : []}) # creates dictionary items with empty lists + duration.update({c : []}) + amplitude.update({c : []}) + + with open(event_file, 'rt') as f: + next(f) # skip the header + + for line in f: + info = line.strip().split() + # Creates list with onsets, duration and loss/gain for amplitude (FSL) + for c in cond_names: + onset[c].append(float(info[0])) + duration[c].append(float(4)) + if c == 'gain': + amplitude[c].append(float(info[2])) + elif c == 'loss': + amplitude[c].append(float(info[3])) + elif c == 'trial': + amplitude[c].append(float(1)) + elif c == 'response': + if info[5] == 'weakly_accept': + amplitude[c].append(float(1)) + elif info[5] == 'strongly_accept': + amplitude[c].append(float(1)) + elif info[5] == 'weakly_reject': + amplitude[c].append(float(0)) + elif info[5] == 'strongly_reject': + amplitude[c].append(float(0)) + else: + amplitude[c].append(float(0)) + + + subject_info = [] + + subject_info.append(Bunch( + conditions=cond_names, + onsets=[onset[k] for k in cond_names], + durations=[duration[k] for k in cond_names], + amplitudes=[amplitude[k] for k in cond_names], + regressor_names=None, + regressors=None) + ) + + return subject_info + + # [INFO] This function creates the contrasts that will be analyzed in the first level analysis + def get_contrasts(): + ''' + Create the list of tuples that represents contrasts. + Each contrast is in the form : + (Name,Stat,[list of condition names],[weights on those conditions]) + + Parameters: + - subject_id: str, ID of the subject + + Returns: + - contrasts: list of tuples, list of contrasts to analyze + ''' + # list of condition names + conditions = ['gain', 'loss'] + + # create contrasts + gain = ('gain', 'T', conditions, [1, 0]) + + loss = ('loss', 'T', conditions, [0, 1]) + + # contrast list + contrasts = [gain, loss] + + return contrasts + + def get_run_level_analysis(self): + """ + Returns the first level analysis workflow. + + Parameters: + - exp_dir: str, directory where raw data are stored + - result_dir: str, directory where results will be stored + - working_dir: str, name of the sub-directory for intermediate results + - output_dir: str, name of the sub-directory for final results + - subject_list: list of str, list of subject for which you want to do the analysis + - run_list: list of str, list of runs for which you want to do the analysis + - TR: float, time repetition used during acquisition + + Returns: + - l1_analysis : Nipype WorkFlow + """ + # Infosource Node - To iterate on subject and runs + info_source = Node( + IdentityInterface( + fields = ['subject_id', 'run_id'] + ), + name = 'info_source' + ) + + info_source.iterables = [ + ('subject_id', self.subject_list), + ('run_id', self.run_list) + ] + + templates = { + 'func': join( + self.directories.output_dir, + 'preprocess', + '_run_id_{run_id}_subject_id_{subject_id}','_addmean0', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_dtype_mcf_mask_smooth_mask_gms_tempfilt_maths.nii.gz', + ), + 'event': join( + self.directories.dataset_dir, + 'sub-{subject_id}', + 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_events.tsv', + ), + 'param': join( + self.directories.output_dir, + 'preprocess', + '_run_id_{run_id}_subject_id_{subject_id}', '_realign0', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_dtype_mcf.nii.gz.par' + ) + } + + # SelectFiles node - to select necessary files + select_files = Node( + SelectFiles(templates, base_directory = self.directories.dataset_dir), + name = 'select_files' + ) + + # DataSink Node - store the wanted results in the wanted repository + data_sink = Node( + DataSink(base_directory = self.directories.output_dir), + name = 'data_sink' + ) + + # [INFO] This is the node executing the get_subject_infos_spm function + # Subject Infos node - get subject specific condition information + subject_infos = Node( + Function( + input_names = ['event_file'], + output_names = ['subject_info'], + function = self.get_subject_infos, + ), + name = 'subject_infos', + ) + subject_infos.inputs.runs = self.run_list + + # Contrasts node - to get contrasts + contrasts = Node( + Function( + output_names = ['contrasts'], + function = self.get_contrasts, + ), + name = 'contrasts', + ) + specify_model = Node( + SpecifyModel( + high_pass_filter_cutoff = 100, + input_units = 'secs', + time_repetition = 1, + ), + name = 'specify_model' + ) + + l1_design = Node( + Level1Design( + bases = {'dgamma':{'derivs' : True}}, + interscan_interval = 1, + model_serial_correlations = True, + ), + name = 'l1_design' + ) + + model_generation = Node( + FEATModel( + ), + name = 'model_generation' + ) + + model_estimate = Node(FILMGLS( + ), + name='model_estimate' + ) + + # Create l1 analysis workflow and connect its nodes + l1_analysis = Workflow( + base_dir = self.directories.working_dir, + name = "run_level_analysis" + ) + + l1_analysis.connect([ + ( + info_source, + select_files, + [('subject_id', 'subject_id'), + ('run_id', 'run_id')] + ), + ( + select_files, + subject_infos, + [('event', 'event_file')] + ), + ( + select_files, + specify_model, + [('param', 'realignment_parameters')] + ), + ( + select_files, + specify_model, + [('func', 'functional_runs')] + ), + ( + subject_infos, + specify_model, + [('subject_info', 'subject_info')] + ), + ( + contrasts, + l1_design, + [('contrasts', 'contrasts')] + ), + ( + specify_model, + l1_design, + [('session_info', 'session_info')] + ), + ( + l1_design, + model_generation, + [('ev_files', 'ev_files'), + ('fsf_files', 'fsf_file')] + ), + ( + select_files, + model_estimate, + [('func', 'in_file')] + ), + ( + model_generation, + model_estimate, + [('con_file', 'tcon_file'), + ('design_file', 'design_file')] + ), + ( + model_estimate, + data_sink, + [('results_dir', 'run_level_analysis.@results')] + ), + ( + model_generation, + data_sink, + [('design_file', 'run_level_analysis.@design_file'), + ('design_image', 'run_level_analysis.@design_img')] + ) + ]) + + return l1_analysis + + def get_registration(self): + """ Return a Nipype worflow describing the registration part of the pipeline """ + + # Infosource Node - To iterate on subjects + info_source = Node( + IdentityInterface( + fields = ['subject_id', 'contrast_id', 'run_id'], + ), + name='info_source', + ) + info_source.iterables = [('subject_id', self.subject_list), + ('contrast_id', self.contrast_list), + ('run_id', self.run_list)] + + # Templates to select files node + # [TODO] Change the name of the files depending on the filenames of results of preprocessing + templates = { + 'cope': join( + self.directories.output_dir, + 'run_level_analysis', + '_run_id_{run_id}_subject_id_{subject_id}', + 'results', + 'cope{contrast_id}.nii.gz', + ), + 'varcope': join( + self.directories.output_dir, + 'run_level_analysis', + '_run_id_{run_id}_subject_id_{subject_id}', + 'results', + 'varcope{contrast_id}.nii.gz', + ), + 'func2anat_transform':join( + self.directories.output_dir, + 'preprocess', + '_run_id_{run_id}_subject_id_{subject_id}', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_dtype_mean_flirt.mat' + ), + 'anat2target_transform':join( + self.directories.output_dir, + 'preprocess', + '_run_id_{run_id}_subject_id_{subject_id}', + 'sub-{subject_id}_T1w_fieldwarp.nii.gz' + ) + } + + # SelectFiles node - to select necessary files + select_files = Node( + SelectFiles(templates, base_directory = self.directories.dataset_dir), + name = 'select_files' + ) + + # DataSink Node - store the wanted results in the wanted repository + data_sink = Node( + DataSink(base_directory = self.directories.output_dir), + name = 'data_sink' + ) + + warpall_cope = MapNode( + ApplyWarp(interp='spline'), + name='warpall_cope', + iterfield=['in_file'] + ) + + warpall_cope.inputs.ref_file = Info.standard_image('MNI152_T1_2mm_brain.nii.gz') + warpall_cope.inputs.mask_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz') + + warpall_varcope = MapNode( + ApplyWarp(interp='spline'), + name='warpall_varcope', + iterfield=['in_file'] + ) + + warpall_varcope.inputs.ref_file = Info.standard_image('MNI152_T1_2mm_brain.nii.gz') + warpall_varcope.inputs.mask_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz') + + # Create registration workflow and connect its nodes + registration = Workflow( + base_dir = self.directories.working_dir, + name = "registration" + ) + + registration.connect([ + ( + info_source, + select_files, + [('subject_id', 'subject_id'), + ('run_id', 'run_id'), + ('contrast_id', 'contrast_id')] + ), + ( + select_files, + warpall_cope, + [('func2anat_transform', 'premat'), + ('anat2target_transform', 'field_file'), + ('cope', 'in_file')] + ), + ( + select_files, + warpall_varcope, + [('func2anat_transform', 'premat'), + ('anat2target_transform', 'field_file'), + ('varcope', 'in_file')] + ), + ( + warpall_cope, + data_sink, + [('out_file', 'registration.@reg_cope')] + ), + ( + warpall_varcope, + data_sink, + [('out_file', 'registration.@reg_varcope')] + ) + ]) + + return registration + + + def get_subject_level_analysis(self): + """ Return a Nipype worflow describing the subject level analysis part of the pipeline """ + + # [INFO] The following part stays the same for all pipelines + + # Infosource Node - To iterate on subjects + info_source = Node( + IdentityInterface( + fields = ['subject_id', 'contrast_id'], + ), + name='info_source', + ) + info_source.iterables = [('subject_id', self.subject_list), ('contrast_id', self.contrast_list)] + + # Templates to select files node + # [TODO] Change the name of the files depending on the filenames of results of preprocessing + templates = { + 'cope': join( + self.directories.output_dir, + 'registration', + '_contrast_id_{contrast_id}_run_id_*_subject_id_{subject_id}', + '_warpall_cope0', + 'cope{contrast_id}_warp.nii.gz', + ), + 'varcope': join( + self.directories.output_dir, + 'registration', + '_contrast_id_{contrast_id}_run_id_*_subject_id_{subject_id}', + '_warpall_varcope0', + 'varcope{contrast_id}_warp.nii.gz', + ) + } + + # SelectFiles node - to select necessary files + select_files = Node( + SelectFiles(templates, base_directory = self.directories.dataset_dir), + name = 'select_files' + ) + + # DataSink Node - store the wanted results in the wanted repository + data_sink = Node( + DataSink(base_directory = self.directories.output_dir), + name = 'data_sink' + ) + + # Generate design matrix + specify_model = Node(L2Model(num_copes = len(self.run_list)), name='l2model') + + # Merge copes and varcopes files for each subject + merge_copes = Node(Merge(dimension='t'), name='merge_copes') + + merge_varcopes = Node(Merge(dimension='t'), name='merge_varcopes') + + # Second level (single-subject, mean of all four scans) analyses: Fixed effects analysis. + flame = Node(FLAMEO(run_mode = 'fe', mask_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz')), + name='flameo') + + # [INFO] The following part defines the nipype workflow and the connections between nodes + + subject_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = 'subject_level_analysis' + ) + # [TODO] Add the connections the workflow needs + # [INFO] Input and output names can be found on NiPype documentation + subject_level_analysis.connect([ + ( + info_source, + select_files, + [('subject_id', 'subject_id'), + ('contrast_id', 'contrast_id')] + ), + ( + select_files, + merge_copes, + [('cope', 'in_files')] + ), + ( + select_files, + merge_varcopes, + [('varcope', 'in_files')] + ), + ( + merge_copes, + flame, + [('merged_file', 'cope_file')] + ), + ( + merge_varcopes, + flame, + [('merged_file', 'var_cope_file')] + ), + ( + specify_model, + flame, + [('design_mat', 'design_file'), + ('design_con', 't_con_file'), + ('design_grp', 'cov_split_file')] + ), + ( + flame, + data_sink, + [('zstats', 'subject_level_analysis.@stats'), + ('tstats', 'subject_level_analysis.@tstats'), + ('copes', 'subject_level_analysis.@copes'), + ('var_copes', 'subject_level_analysis.@varcopes')] + ), + ]) + + # [INFO] Here we simply return the created workflow + return subject_level_analysis + + # [INFO] This function returns the list of ids and files of each group of participants + # to do analyses for both groups, and one between the two groups. + def get_subgroups_contrasts( + copes, varcopes, subject_list: list, participants_file: str + ): + """ + This function return the file list containing only the files + belonging to subject in the wanted group. + + Parameters : + - copes: original file list selected by select_files node + - varcopes: original file list selected by select_files node + - subject_ids: list of subject IDs that are analyzed + - participants_file: file containing participants characteristics + + Returns : + - copes_equal_indifference : a subset of copes corresponding to subjects + in the equalIndifference group + - copes_equal_range : a subset of copes corresponding to subjects + in the equalRange group + - copes_global : a list of all copes + - varcopes_equal_indifference : a subset of varcopes corresponding to subjects + in the equalIndifference group + - varcopes_equal_range : a subset of varcopes corresponding to subjects + in the equalRange group + - equal_indifference_id : a list of subject ids in the equalIndifference group + - equal_range_id : a list of subject ids in the equalRange group + - varcopes_global : a list of all varcopes + """ + + equal_range_id = [] + equal_indifference_id = [] + + # Reading file containing participants IDs and groups + with open(participants_file, 'rt') as file: + next(file) # skip the header + + for line in file: + info = line.strip().split() + + # Checking for each participant if its ID was selected + # and separate people depending on their group + if info[0][-3:] in subject_list and info[1] == 'equalIndifference': + equal_indifference_id.append(info[0][-3:]) + elif info[0][-3:] in subject_list and info[1] == 'equalRange': + equal_range_id.append(info[0][-3:]) + + copes_equal_indifference = [] + copes_equal_range = [] + copes_global = [] + varcopes_equal_indifference = [] + varcopes_equal_range = [] + varcopes_global = [] + + # Checking for each selected file if the corresponding participant was selected + # and add the file to the list corresponding to its group + for cope, varcope in zip(copes, varcopes): + sub_id = cope.split('/') + if sub_id[-2][-3:] in equal_indifference_id: + copes_equal_indifference.append(cope) + elif sub_id[-2][-3:] in equal_range_id: + copes_equal_range.append(cope) + if sub_id[-2][-3:] in subject_list: + copes_global.append(cope) + + sub_id = varcope.split('/') + if sub_id[-2][-3:] in equal_indifference_id: + varcopes_equal_indifference.append(varcope) + elif sub_id[-2][-3:] in equal_range_id: + varcopes_equal_range.append(varcope) + if sub_id[-2][-3:] in subject_list: + varcopes_global.append(varcope) + + return copes_equal_indifference, copes_equal_range, varcopes_equal_indifference, varcopes_equal_range,equal_indifference_id, equal_range_id,copes_global, varcopes_global + + + # [INFO] This function creates the dictionary of regressors used in FSL Nipype pipelines + def get_regressors( + equal_range_id: list, + equal_indifference_id: list, + method: str, + subject_list: list, + ) -> dict: + """ + Create dictionary of regressors for group analysis. + + Parameters: + - equal_range_id: ids of subjects in equal range group + - equal_indifference_id: ids of subjects in equal indifference group + - method: one of "equalRange", "equalIndifference" or "groupComp" + - subject_list: ids of subject for which to do the analysis + + Returns: + - regressors: regressors used to distinguish groups in FSL group analysis + """ + # For one sample t-test, creates a dictionary + # with a list of the size of the number of participants + if method == 'equalRange': + regressors = dict(group_mean = [1 for i in range(len(equal_range_id))]) + elif method == 'equalIndifference': + regressors = dict(group_mean = [1 for i in range(len(equal_indifference_id))]) + + # For two sample t-test, creates 2 lists: + # - one for equal range group, + # - one for equal indifference group + # Each list contains n_sub values with 0 and 1 depending on the group of the participant + # For equalRange_reg list --> participants with a 1 are in the equal range group + elif method == 'groupComp': + equalRange_reg = [ + 1 for i in range(len(equal_range_id) + len(equal_indifference_id)) + ] + equalIndifference_reg = [ + 0 for i in range(len(equal_range_id) + len(equal_indifference_id)) + ] + + for index, subject_id in enumerate(subject_list): + if subject_id in equal_indifference_id: + equalIndifference_reg[index] = 1 + equalRange_reg[index] = 0 + + regressors = dict( + equalRange = equalRange_reg, + equalIndifference = equalIndifference_reg + ) + + return regressors + + def get_group_level_analysis(self): + """ + Return all workflows for the group level analysis. + + Returns; + - a list of nipype.WorkFlow + """ + + methods = ['equalRange', 'equalIndifference', 'groupComp'] + return [self.get_group_level_analysis_sub_workflow(method) for method in methods] + + def get_group_level_analysis_sub_workflow(self, method): + """ + Return a workflow for the group level analysis. + + Parameters: + - method: one of 'equalRange', 'equalIndifference' or 'groupComp' + + Returns: + - group_level_analysis: nipype.WorkFlow + """ + # [INFO] The following part stays the same for all preprocessing pipelines + + # Infosource node - iterate over the list of contrasts generated + # by the subject level analysis + info_source = Node( + IdentityInterface( + fields = ['contrast_id', 'subjects'], + subjects = self.subject_list + ), + name = 'info_source', + ) + info_source.iterables = [('contrast_id', self.contrast_list)] + + # Templates to select files node + # [TODO] Change the name of the files depending on the filenames + # of results of first level analysis + template = { + 'cope' : join( + self.directories.results_dir, + 'subject_level_analysis', + '_contrast_id_{contrast_id}_subject_id_*', 'cope1.nii.gz'), + 'varcope' : join( + self.directories.results_dir, + 'subject_level_analysis', + '_contrast_id_{contrast_id}_subject_id_*', 'varcope1.nii.gz'), + 'participants' : join( + self.directories.dataset_dir, + 'participants.tsv') + } + select_files = Node( + SelectFiles( + templates, + base_directory = self.directories.results_dir, + force_list = True + ), + name = 'select_files', + ) + + # Datasink node - to save important files + data_sink = Node( + DataSink(base_directory = self.directories.output_dir), + name = 'data_sink', + ) + + contrasts = Node( + Function( + input_names=['copes', 'varcopes', 'subject_ids', 'participants_file'], + output_names=[ + 'copes_equalIndifference', + 'copes_equalRange', + 'varcopes_equalIndifference', + 'varcopes_equalRange', + 'equalIndifference_id', + 'equalRange_id', + 'copes_global', + 'varcopes_global' + ], + function = self.get_subgroups_contrasts, + ), + name = 'subgroups_contrasts', + ) + + regs = Node( + Function( + input_names = [ + 'equalRange_id', + 'equalIndifference_id', + 'method', + 'subject_list', + ], + output_names = ['regressors'], + function = self.get_regressors, + ), + name = 'regs', + ) + regs.inputs.method = method + regs.inputs.subject_list = self.subject_list + + # [INFO] The following part has to be modified with nodes of the pipeline + + # [TODO] For each node, replace 'node_name' by an explicit name, and use it for both: + # - the name of the variable in which you store the Node object + # - the 'name' attribute of the Node + # [TODO] The node_function refers to a NiPype interface that you must import + # at the begining of the file. + merge_copes = Node( + Merge(dimension = 't'), + name = 'merge_copes' + ) + + merge_varcopes = Node( + Merge(dimension = 't'), + name = 'merge_varcopes' + ) + + specify_model = Node( + MultipleRegressDesign(), + name = 'specify_model' + ) + + flame = Node( + FLAMEO( + run_mode = 'flame1', + mask_file = Info.standard_image('MNI152_T1_2mm_brain_mask.nii.gz') + ), + name='flame' + ) + + cluster = MapNode( + Cluster( + threshold = 3.1, + out_threshold_file = True + ), + name = 'cluster', + iterfield = ['in_file', 'cope_file'], + synchronize = True + ) + + # [INFO] The following part defines the nipype workflow and the connections between nodes + + # Compute the number of participants used to do the analysis + nb_subjects = len(self.subject_list) + + # Declare the workflow + group_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}' + ) + group_level_analysis.connect( + [ + ( + info_source, + select_files, + [('contrast_id', 'contrast_id')], + ), + ( + info_source, + subgroups_contrasts, + [('subject_list', 'subject_ids')], + ), + ( + select_files, + subgroups_contrasts, + [ + ('cope', 'copes'), + ('varcope', 'varcopes'), + ('participants', 'participants_file'), + ], + ), + ( + subgroups_contrasts, + regs, + [ + ('equalRange_id', 'equalRange_id'), + ('equalIndifference_id', 'equalIndifference_id') + ] + ), + ( + regs, + specify_model, + [('regressors', 'regressors')] + ) + ] + ) # Complete with other links between nodes + + + + # [INFO] Here whe define the contrasts used for the group level analysis, depending on the + # method used. + if method in ('equalRange', 'equalIndifference'): + contrasts = [('Group', 'T', ['mean'], [1]), ('Group', 'T', ['mean'], [-1])] + + if method == 'equalIndifference': + group_level_analysis.connect([ + ( + subgroups_contrasts, + merge_copes, + [('copes_equalIndifference', 'in_files')] + ), + ( + subgroups_contrasts, + merge_varcopes, + [('varcopes_equalIndifference', 'in_files')] + ) + ]) + + elif method == 'equalRange': + group_level_analysis.connect([ + ( + subgroups_contrasts, + merge_copes_3rdlevel, + [('copes_equalRange', 'in_files')] + ), + ( + subgroups_contrasts, + merge_varcopes_3rdlevel, + [('varcopes_equalRange', 'in_files')] + ) + ]) + elif method == 'groupComp': + contrasts = [ + ('Eq range vs Eq indiff in loss', 'T', ['Group_{1}', 'Group_{2}'], [1, -1]) + ] + + group_level_analysis.connect([ + ( + select_files, + merge_copes, + [('cope', 'in_files')] + ), + ( + select_files, + merge_varcopes, + [('varcope', 'in_files')] + ) + ]) + + group_level_analysis.connect([ + ( + merge_copes, + flame, + [('merged_file', 'cope_file')] + ), + ( + merge_varcopes, + flame, + [('merged_file', 'var_cope_file')] + ), + ( + specify_model, + flame, + [ + ('design_mat', 'design_file'), + ('design_con', 't_con_file'), + ('design_grp', 'cov_split_file') + ] + ), + ( + flame, + cluster, + [ + ('zstats', 'in_file'), + ('copes', 'cope_file') + ] + ), + ( + flame, + data_sink, + [ + ('zstats', f"group_level_analysis_{method}_nsub_{nb_subjects}.@zstats"), + ('tstats', f"group_level_analysis_{method}_nsub_{nb_subjects}.@tstats") + ] + ), + ( + cluster, + data_sink, + [('threshold_file', f"group_level_analysis_{method}_nsub_{nb_subjects}.@thresh")] + ) + ]) + # [INFO] Here we simply return the created workflow + return group_level_analysis