diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index 2c1a1911..285f8f49 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -38,8 +38,8 @@ def __init__(self): self.contrast_list = ['0001', '0002', '0003', '0004'] # Define contrasts - gain_conditions = [f'gamble_run{r}xgain_run{r}^1' for r in range(1,len(self.run_list) + 1)] - loss_conditions = [f'gamble_run{r}xloss_run{r}^1' for r in range(1,len(self.run_list) + 1)] + gain_conditions = [f'gamble_run{r}xgain^1' for r in range(1,len(self.run_list) + 1)] + loss_conditions = [f'gamble_run{r}xloss^1' for r in range(1,len(self.run_list) + 1)] self.subject_level_contrasts = [ ('pos_gain', 'T', gain_conditions, [1, 1, 1, 1]), ('pos_loss', 'T', loss_conditions, [1, 1, 1, 1]), @@ -58,60 +58,57 @@ def get_dartel_template_sub_workflow(self): Returns: - dartel : nipype.WorkFlow """ - # Infosource Node - To iterate on subjects + + # Init workflow + dartel_workflow = Workflow( + base_dir = self.directories.working_dir, name = 'dartel_workflow') + + # IDENTITY INTERFACE - To iterate on subjects information_source = Node(IdentityInterface( fields = ['subject_id']), name = 'information_source') information_source.iterables = ('subject_id', self.subject_list) - # SelectFiles Node - to select necessary files + # SELECT FILES - Select anat file template = { 'anat' : join('sub-{subject_id}', 'anat', 'sub-{subject_id}_T1w.nii.gz') } select_files = Node(SelectFiles(template), name = 'select_files') select_files.inputs.base_directory = self.directories.dataset_dir + dartel_workflow.connect(information_source, 'subject_id', select_files, 'subject_id') - # Gunzip node - SPM do not use .nii.gz files + # GUNZIP - SPM do not use .nii.gz files gunzip_anat = Node(Gunzip(), name = 'gunzip_anat') + dartel_workflow.connect(select_files, 'anat', gunzip_anat, 'in_file') - def get_dartel_input(structural_files): - print(structural_files) - return structural_files - - dartel_input = JoinNode(Function( - function = get_dartel_input, - input_names = ['structural_files'], - output_names = ['structural_files']), - name = 'dartel_input', + # IDENTITY INTERFACE - Join all gunziped files + dartel_inputs = JoinNode(IdentityInterface(fields = ['structural_files']), + name = 'dartel_inputs', joinsource = 'information_source', joinfield = 'structural_files') + dartel_workflow.connect(gunzip_anat, 'out_file', dartel_inputs, 'structural_files') + # RENAME - Rename files before dartel workflow rename_dartel = MapNode(Rename(format_string = 'subject_id_%(subject_id)s_struct'), iterfield = ['in_file', 'subject_id'], name = 'rename_dartel') rename_dartel.inputs.subject_id = self.subject_list rename_dartel.inputs.keep_ext = True + dartel_workflow.connect(dartel_inputs, 'structural_files', rename_dartel, 'in_file') + # DARTEL - Using a already existing workflow dartel_sub_workflow = create_DARTEL_template(name = 'dartel_sub_workflow') dartel_sub_workflow.inputs.inputspec.template_prefix = 'template' + dartel_workflow.connect( + rename_dartel, 'out_file', dartel_sub_workflow, 'inputspec.structural_files') - # DataSink Node - store the wanted results in the wanted repository + # DATASINK - store the wanted results in the wanted repository data_sink = Node(DataSink(), name = 'data_sink') data_sink.inputs.base_directory = self.directories.output_dir - - # Create dartel workflow and connect its nodes - dartel_workflow = Workflow( - base_dir = self.directories.working_dir, name = 'dartel_workflow') - dartel_workflow.connect([ - (information_source, select_files, [('subject_id', 'subject_id')]), - (select_files, gunzip_anat, [('anat', 'in_file')]), - (gunzip_anat, dartel_input, [('out_file', 'structural_files')]), - (dartel_input, rename_dartel, [('structural_files', 'in_file')]), - (rename_dartel, dartel_sub_workflow, [('out_file', 'inputspec.structural_files')]), - (dartel_sub_workflow, data_sink, [ - ('outputspec.template_file', 'dartel_template.@template_file'), - ('outputspec.flow_fields', 'dartel_template.@flow_fields')]) - ]) + dartel_workflow.connect( + dartel_sub_workflow, 'outputspec.template_file', data_sink, 'dartel_template.@template_file') + dartel_workflow.connect( + dartel_sub_workflow, 'outputspec.flow_fields', data_sink, 'dartel_template.@flow_fields') # Remove large files, if requested if Configuration()['pipelines']['remove_unused_data']: @@ -122,12 +119,8 @@ def get_dartel_input(structural_files): input_names = ['_', 'file_name'], output_names = [] ), name = 'remove_gunzip') - - # Add connections - dartel_workflow.connect([ - (gunzip_anat, remove_gunzip, [('out_file', 'file_name')]), - (data_sink, remove_gunzip, [('out_file', '_')]) - ]) + dartel_workflow.connect(gunzip_anat, 'out_file', remove_gunzip, 'file_name') + dartel_workflow.connect(data_sink, 'out_file', remove_gunzip, '_') return dartel_workflow @@ -165,19 +158,26 @@ def get_preprocessing_sub_workflow(self): Returns: - preprocessing : nipype.WorkFlow """ - # Infosource Node - To iterate on subjects - information_source = Node(IdentityInterface( + # Create preprocessing workflow + preprocessing = Workflow(base_dir = self.directories.working_dir, name = 'preprocessing') + + # IDENTITY INTERFACE - To iterate on subjects + information_source_subjects = Node(IdentityInterface( + fields = ['subject_id']), + name = 'information_source_subjects') + information_source_subjects.iterables = ('subject_id', self.subject_list) + + # IDENTITY INTERFACE - To iterate on runs + information_source_runs = Node(IdentityInterface( fields = ['subject_id', 'run_id']), - name = 'information_source') - information_source.iterables = [ - ('subject_id', self.subject_list), ('run_id', self.run_list) - ] + name = 'information_source_runs') + information_source_runs.iterables = ('run_id', self.run_list) + preprocessing.connect( + information_source_subjects, 'subject_id', information_source_runs, 'subject_id') - # SelectFiles Node - Select necessary files + # SELECT FILES - Select necessary subject files templates = { 'anat' : join('sub-{subject_id}', 'anat', 'sub-{subject_id}_T1w.nii.gz'), - 'func' : join('sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz'), 'magnitude' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_magnitude*.nii.gz'), 'phasediff' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_phasediff.nii.gz'), 'info_fmap' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_phasediff.json'), @@ -186,26 +186,50 @@ def get_preprocessing_sub_workflow(self): 'dartel_template' :join( self.directories.output_dir, 'dartel_template', 'template_6.nii') } - select_files = Node(SelectFiles(templates), name = 'select_files') - select_files.inputs.base_directory = self.directories.dataset_dir + select_subject_files = Node(SelectFiles(templates), name = 'select_subject_files') + select_subject_files.inputs.base_directory = self.directories.dataset_dir + preprocessing.connect( + information_source_subjects, 'subject_id', select_subject_files, 'subject_id') - # Gunzip nodes - gunzip files because SPM do not use .nii.gz files + # SELECT FILES - Select necessary run files + templates = { + 'func' : join('sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz') + } + select_run_files = Node(SelectFiles(templates), name = 'select_run_files') + select_run_files.inputs.base_directory = self.directories.dataset_dir + preprocessing.connect(information_source_runs, 'subject_id', select_run_files, 'subject_id') + preprocessing.connect(information_source_runs, 'run_id', select_run_files, 'run_id') + + # GUNZIP - gunzip files because SPM do not use .nii.gz files gunzip_anat = Node(Gunzip(), name = 'gunzip_anat') gunzip_func = Node(Gunzip(), name = 'gunzip_func') gunzip_magnitude = Node(Gunzip(), name = 'gunzip_magnitude') gunzip_phasediff = Node(Gunzip(), name = 'gunzip_phasediff') + preprocessing.connect(select_subject_files, 'anat', gunzip_anat, 'in_file'), + preprocessing.connect(select_run_files, 'func', gunzip_func, 'in_file') + preprocessing.connect(select_subject_files, 'phasediff', gunzip_phasediff, 'in_file') - # Function Node get_fieldmap_info - + # FUNCTION Node get_fieldmap_info - Retrieve magnitude and phasediff metadata to decide + # which files to use for the fieldmap node, and what echo times fieldmap_info = Node(Function( function = self.get_fieldmap_info, input_names = ['fieldmap_info_file', 'magnitude_files'], output_names = ['echo_times', 'magnitude_file']), name = 'fieldmap_info') + preprocessing.connect( + select_subject_files, 'info_fmap', fieldmap_info, 'fieldmap_info_file') + preprocessing.connect(select_subject_files, 'magnitude', fieldmap_info, 'magnitude_files') + preprocessing.connect(fieldmap_info, 'magnitude_file', gunzip_magnitude, 'in_file') - # FieldMap Node - + # FIELDMAP Node - create fieldmap from phasediff and magnitude files fieldmap = Node(FieldMap(), name = 'fieldmap') fieldmap.inputs.blip_direction = -1 fieldmap.inputs.total_readout_time = TaskInformation()['TotalReadoutTime'] + preprocessing.connect(fieldmap_info, 'echo_times', fieldmap, 'echo_times') + preprocessing.connect(gunzip_magnitude, 'out_file', fieldmap, 'magnitude_file') + preprocessing.connect(gunzip_phasediff, 'out_file', fieldmap, 'phase_file') + preprocessing.connect(gunzip_func, 'out_file', fieldmap, 'epi_file') # Get SPM Tissue Probability Maps file spm_tissues_file = join(SPMInfo.getinfo()['path'], 'tpm', 'TPM.nii') @@ -222,6 +246,7 @@ def get_preprocessing_sub_workflow(self): [(spm_tissues_file, 5), 4, (True,False), (False, False)], [(spm_tissues_file, 6), 2, (False,False), (False, False)] ] + preprocessing.connect(gunzip_anat, 'out_file', segmentation, 'channel_files') # Slice timing - SPM slice time correction with default parameters slice_timing = Node(SliceTiming(), name = 'slice_timing') @@ -230,98 +255,99 @@ def get_preprocessing_sub_workflow(self): slice_timing.inputs.slice_order = TaskInformation()['SliceTiming'] slice_timing.inputs.time_acquisition = TaskInformation()['AcquisitionTime'] slice_timing.inputs.time_repetition = TaskInformation()['RepetitionTime'] + preprocessing.connect(gunzip_func, 'out_file', slice_timing, 'in_files') # Motion correction - SPM realign and unwarp motion_correction = Node(RealignUnwarp(), name = 'motion_correction') motion_correction.inputs.interp = 4 + preprocessing.connect(fieldmap, 'vdm', motion_correction, 'phase_map') + preprocessing.connect(slice_timing, 'timecorrected_files', motion_correction, 'in_files') # Intrasubject coregistration extract_first = Node(ExtractROI(), name = 'extract_first') extract_first.inputs.t_min = 1 extract_first.inputs.t_size = 1 extract_first.inputs.output_type = 'NIFTI' + preprocessing.connect( + motion_correction, 'realigned_unwarped_files', extract_first, 'in_file') coregistration = Node(Coregister(), name = 'coregistration') coregistration.inputs.cost_function = 'nmi' coregistration.inputs.jobtype = 'estimate' + preprocessing.connect( + motion_correction, 'realigned_unwarped_files', coregistration, 'apply_to_files') + preprocessing.connect(gunzip_anat, 'out_file', coregistration, 'target') + preprocessing.connect(extract_first, 'roi_file', coregistration, 'source') dartel_norm_func = Node(DARTELNorm2MNI(), name = 'dartel_norm_func') dartel_norm_func.inputs.fwhm = self.fwhm dartel_norm_func.inputs.modulate = False dartel_norm_func.inputs.voxel_size = (2.3, 2.3, 2.15) + preprocessing.connect( + select_subject_files, 'dartel_flow_field', dartel_norm_func, 'flowfield_files') + preprocessing.connect( + select_subject_files, 'dartel_template', dartel_norm_func, 'template_file') + preprocessing.connect( + coregistration, 'coregistered_files', dartel_norm_func, 'apply_to_files') dartel_norm_anat = Node(DARTELNorm2MNI(), name = 'dartel_norm_anat') dartel_norm_anat.inputs.fwhm = self.fwhm dartel_norm_anat.inputs.voxel_size = (1, 1, 1) + preprocessing.connect( + select_subject_files, 'dartel_flow_field', dartel_norm_anat, 'flowfield_files') + preprocessing.connect( + select_subject_files, 'dartel_template', dartel_norm_anat, 'template_file') + preprocessing.connect(gunzip_anat, 'out_file', dartel_norm_anat, 'apply_to_files') # DataSink Node - store the wanted results in the wanted repository data_sink = Node(DataSink(), name = 'data_sink') data_sink.inputs.base_directory = self.directories.output_dir - - # Create preprocessing workflow and connect its nodes - preprocessing = Workflow(base_dir = self.directories.working_dir, name = 'preprocessing') - preprocessing.connect([ - (information_source, select_files, [ - ('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (select_files, gunzip_anat, [('anat', 'in_file')]), - (select_files, gunzip_func, [('func', 'in_file')]), - (select_files, gunzip_phasediff, [('phasediff', 'in_file')]), - (select_files, fieldmap_info, [ - ('info_fmap', 'fieldmap_info_file'), - ('magnitude', 'magnitude_files')]), - (fieldmap_info, gunzip_magnitude, [('magnitude_file', 'in_file')]), - (fieldmap_info, fieldmap, [('echo_times', 'echo_times')]), - (gunzip_magnitude, fieldmap, [('out_file', 'magnitude_file')]), - (gunzip_phasediff, fieldmap, [('out_file', 'phase_file')]), - (gunzip_func, fieldmap, [('out_file', 'epi_file')]), - (fieldmap, motion_correction, [('vdm', 'phase_map')]), - (gunzip_anat, segmentation, [('out_file', 'channel_files')]), - (gunzip_func, slice_timing, [('out_file', 'in_files')]), - (slice_timing, motion_correction, [('timecorrected_files', 'in_files')]), - (motion_correction, coregistration, [('realigned_unwarped_files', 'apply_to_files')]), - (gunzip_anat, coregistration, [('out_file', 'target')]), - (motion_correction, extract_first, [('realigned_unwarped_files', 'in_file')]), - (extract_first, coregistration, [('roi_file', 'source')]), - (select_files, dartel_norm_func, [ - ('dartel_flow_field', 'flowfield_files'), - ('dartel_template', 'template_file')]), - (select_files, dartel_norm_anat, [ - ('dartel_flow_field', 'flowfield_files'), - ('dartel_template', 'template_file')]), - (gunzip_anat, dartel_norm_anat, [('out_file', 'apply_to_files')]), - (coregistration, dartel_norm_func, [('coregistered_files', 'apply_to_files')]), - (dartel_norm_func, data_sink, [ - ('normalized_files', 'preprocessing.@normalized_files')]), - (motion_correction, data_sink, [ - ('realigned_unwarped_files', 'preprocessing.@motion_corrected'), - ('realignment_parameters', 'preprocessing.@param')]), - (segmentation, data_sink, [('normalized_class_images', 'preprocessing.@seg')]), - ]) + preprocessing.connect( + dartel_norm_func, 'normalized_files', data_sink, 'preprocessing.@normalized_files') + preprocessing.connect( + motion_correction, 'realigned_unwarped_files', + data_sink, 'preprocessing.@motion_corrected') + preprocessing.connect( + motion_correction, 'realignment_parameters', data_sink, 'preprocessing.@param') + preprocessing.connect( + segmentation, 'normalized_class_images', data_sink, 'preprocessing.@seg') # Remove large files, if requested if Configuration()['pipelines']['remove_unused_data']: - # Merge Node - Merge file names to be removed after datasink node is performed - merge_removable_files = Node(Merge(5), name = 'merge_removable_files') - merge_removable_files.inputs.ravel_inputs = True + # Merge Node - Merge anat file names to be removed after datasink node is performed + merge_removable_anat_files = Node(Merge(3), name = 'merge_removable_anat_files') + merge_removable_anat_files.inputs.ravel_inputs = True - # Function Nodes remove_files - Remove sizeable files once they aren't needed - remove_after_datasink = MapNode(Function( + # Function Nodes remove_files - Remove sizeable anat files once they aren't needed + remove_anat_after_datasink = MapNode(Function( function = remove_parent_directory, input_names = ['_', 'file_name'], output_names = [] - ), name = 'remove_after_datasink', iterfield = 'file_name') + ), name = 'remove_anat_after_datasink', iterfield = 'file_name') + preprocessing.connect([ + (gunzip_phasediff, merge_removable_anat_files, [('out_file', 'in1')]), + (gunzip_magnitude, merge_removable_anat_files, [('out_file', 'in2')]), + (fieldmap, merge_removable_anat_files, [('vdm', 'in3')]), + (merge_removable_anat_files, remove_anat_after_datasink, [('out', 'file_name')]), + (data_sink, remove_anat_after_datasink, [('out_file', '_')]) + ]) + + # Merge Node - Merge func file names to be removed after datasink node is performed + merge_removable_func_files = Node(Merge(2), name = 'merge_removable_func_files') + merge_removable_func_files.inputs.ravel_inputs = True - # Add connections + # Function Nodes remove_files - Remove sizeable func files once they aren't needed + remove_func_after_datasink = MapNode(Function( + function = remove_parent_directory, + input_names = ['_', 'file_name'], + output_names = [] + ), name = 'remove_func_after_datasink', iterfield = 'file_name') preprocessing.connect([ - (gunzip_func, merge_removable_files, [('out_file', 'in1')]), - (gunzip_phasediff, merge_removable_files, [('out_file', 'in2')]), - (gunzip_magnitude, merge_removable_files, [('out_file', 'in3')]), - (fieldmap, merge_removable_files, [('vdm', 'in4')]), - (slice_timing, merge_removable_files, [('timecorrected_files', 'in5')]), - (merge_removable_files, remove_after_datasink, [('out', 'file_name')]), - (data_sink, remove_after_datasink, [('out_file', '_')]) + (gunzip_func, merge_removable_func_files, [('out_file', 'in1')]), + (slice_timing, merge_removable_func_files, [('timecorrected_files', 'in2')]), + (merge_removable_func_files, remove_func_after_datasink, [('out', 'file_name')]), + (data_sink, remove_func_after_datasink, [('out_file', '_')]) ]) return preprocessing @@ -352,21 +378,17 @@ def get_preprocessing_outputs(self): 'run_id': self.run_list, } parameter_sets = product(*parameters.values()) - output_dir = join( - self.directories.output_dir, - 'preprocessing', - '_run_id_{run_id}_subject_id_{subject_id}' - ) + output_dir = join(self.directories.output_dir, 'preprocessing', '_subject_id_{subject_id}') + run_output_dir = join(output_dir, '_run_id_{run_id}') templates = [ # Realignment parameters - join(output_dir, 'rp_asub-{subject_id}_task-MGT_run-{run_id}_bold.txt'), + join(run_output_dir, 'rp_asub-{subject_id}_task-MGT_run-{run_id}_bold.txt'), # Realigned unwarped files - join(output_dir, 'uasub-{subject_id}_task-MGT_run-{run_id}_bold.nii'), + join(run_output_dir, 'uasub-{subject_id}_task-MGT_run-{run_id}_bold.nii'), # Normalized_files - join(output_dir, 'swuasub-{subject_id}_task-MGT_run-{run_id}_bold.nii'), + join(run_output_dir, 'swuasub-{subject_id}_task-MGT_run-{run_id}_bold.nii'), # Normalized class images - join(output_dir, 'wc2sub-{subject_id}_T1w.nii'), - join(output_dir, 'wc1sub-{subject_id}_T1w.nii') + join(output_dir, 'wc2sub-{subject_id}_T1w.nii') ] return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ for parameter_values in parameter_sets for template in templates] @@ -469,10 +491,14 @@ def get_subject_information(event_file: str, short_run_id: int): durations.append(4.0) weights_gain.append(float(info[2])) weights_loss.append(float(info[3])) - if 'accept' in str(info[5]): + if 'weakly_accept' in str(info[5]): answers.append(1) + elif 'strongly_accept' in str(info[5]): + answers.append(2) + elif 'weakly_reject' in str(info[5]): + answers.append(-1) else: - answers.append(0) + answers.append(-2) # Create Bunch return Bunch( @@ -483,10 +509,7 @@ def get_subject_information(event_file: str, short_run_id: int): tmod = None, pmod = [ Bunch( - name = [ - f'gain_run{short_run_id}', - f'loss_run{short_run_id}', - f'answers_run{short_run_id}'], + name = ['gain', 'loss', 'answers'], poly = [1, 1, 1], param = [weights_gain, weights_loss, answers] ) @@ -502,45 +525,50 @@ def get_subject_level_analysis(self): Returns: - subject_level_analysis : nipype.WorkFlow """ - # Infosource Node - To iterate on subjects + # Init workflow + subject_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = 'subject_level_analysis' + ) + + # IDENTITY INTERFACE - To iterate on subjects information_source = Node(IdentityInterface( fields = ['subject_id']), name = 'information_source') information_source.iterables = [('subject_id', self.subject_list)] - # SelectFiles Node - to select necessary files + # SELECT FILES - to select necessary files templates = { 'func' : join(self.directories.output_dir, - 'preprocessing', '_run_id_*_subject_id_{subject_id}', + 'preprocessing', '_subject_id_{subject_id}', '_run_id_*', 'swuasub-{subject_id}_task-MGT_run-*_bold.nii'), 'motion_correction': join(self.directories.output_dir, - 'preprocessing', '_run_id_*_subject_id_{subject_id}', + 'preprocessing', '_subject_id_{subject_id}', '_run_id_*', 'uasub-{subject_id}_task-MGT_run-*_bold.nii'), 'param' : join(self.directories.output_dir, - 'preprocessing', '_run_id_*_subject_id_{subject_id}', + 'preprocessing', '_subject_id_{subject_id}', '_run_id_*', 'rp_asub-{subject_id}_task-MGT_run-*_bold.txt'), 'wc2' : join(self.directories.output_dir, - 'preprocessing', '_run_id_01_subject_id_{subject_id}', + 'preprocessing', '_subject_id_{subject_id}', 'wc2sub-{subject_id}_T1w.nii'), 'events' : join(self.directories.dataset_dir, 'sub-{subject_id}', 'func', 'sub-{subject_id}_task-MGT_run-*_events.tsv') } select_files = Node(SelectFiles(templates),name = 'select_files') select_files.inputs.base_directory = self.directories.results_dir + subject_level_analysis.connect( + information_source, 'subject_id', select_files, 'subject_id') - # DataSink Node - store the wanted results in the wanted repository - data_sink = Node(DataSink(), name = 'data_sink') - data_sink.inputs.base_directory = self.directories.output_dir - - # Get Subject Info - get subject specific condition information + # FUNCTION node get_subject_information - get subject specific condition information subject_information = MapNode(Function( function = self.get_subject_information, input_names = ['event_file', 'short_run_id'], output_names = ['subject_info']), name = 'subject_information', iterfield = ['event_file', 'short_run_id']) subject_information.inputs.short_run_id = list(range(1, len(self.run_list) + 1)) + subject_level_analysis.connect(select_files, 'events', subject_information, 'event_file') - # Get parameters + # FUNCTION node get_parameters_file - Get subject parameters parameters = MapNode(Function( function = self.get_parameters_file, input_names = [ @@ -556,57 +584,57 @@ def get_subject_level_analysis(self): name = 'parameters') parameters.inputs.run_id = self.run_list parameters.inputs.working_dir = self.directories.working_dir + subject_level_analysis.connect(information_source, 'subject_id', parameters, 'subject_id') + subject_level_analysis.connect(select_files, 'motion_correction', parameters, 'func_file') + subject_level_analysis.connect(select_files, 'param', parameters,'parameters_file') + subject_level_analysis.connect(select_files, 'wc2', parameters,'wc2_file') - # SpecifyModel - Generates SPM-specific Model + # SPECIFY MODEL - Generates SPM-specific Model specify_model = Node(SpecifySPMModel(), name = 'specify_model') specify_model.inputs.concatenate_runs = False specify_model.inputs.input_units = 'secs' specify_model.inputs.output_units = 'secs' specify_model.inputs.time_repetition = TaskInformation()['RepetitionTime'] specify_model.inputs.high_pass_filter_cutoff = 128 + subject_level_analysis.connect(select_files, 'func', specify_model, 'functional_runs') + subject_level_analysis.connect( + subject_information, 'subject_info', specify_model, 'subject_info') + subject_level_analysis.connect( + parameters, 'new_parameters_file', specify_model, 'realignment_parameters') - # Level1Design - Generates an SPM design matrix + # LEVEL1DESIGN - Generates an SPM design matrix model_design = Node(Level1Design(), name = 'model_design') model_design.inputs.bases = {'hrf': {'derivs': [1, 1]}} model_design.inputs.timing_units = 'secs' model_design.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + model_design.inputs.volterra_expansion_order = 2 + subject_level_analysis.connect(specify_model, 'session_info', model_design, 'session_info') - # EstimateModel - estimate the parameters of the model + # ESTIMATE MODEL - estimate the parameters of the model model_estimate = Node(EstimateModel(), name = 'model_estimate') model_estimate.inputs.estimation_method = {'Classical': 1} + subject_level_analysis.connect( + model_design, 'spm_mat_file', model_estimate, 'spm_mat_file') - # EstimateContrast - estimates contrasts + # ESTIMATE CONTRAST - estimates contrasts contrast_estimate = Node(EstimateContrast(), name = 'contrast_estimate') contrast_estimate.inputs.contrasts = self.subject_level_contrasts - - # Create l1 analysis workflow and connect its nodes - subject_level_analysis = Workflow( - base_dir = self.directories.working_dir, - name = 'subject_level_analysis' - ) - subject_level_analysis.connect([ - (information_source, select_files, [('subject_id', 'subject_id')]), - (information_source, parameters, [('subject_id', 'subject_id')]), - (select_files, subject_information, [('events', 'event_file')]), - (select_files, parameters, [ - ('motion_correction', 'func_file'), - ('param', 'parameters_file'), - ('wc2', 'wc2_file')]), - (select_files, specify_model, [('func', 'functional_runs')]), - (subject_information, specify_model, [('subject_info', 'subject_info')]), - (parameters, specify_model, [ - ('new_parameters_file', 'realignment_parameters')]), - (specify_model, model_design, [('session_info', 'session_info')]), - (model_design, model_estimate, [('spm_mat_file', 'spm_mat_file')]), - (model_estimate, contrast_estimate, [ - ('spm_mat_file', 'spm_mat_file'), - ('beta_images', 'beta_images'), - ('residual_image', 'residual_image')]), - (contrast_estimate, data_sink, [ - ('con_images', 'subject_level_analysis.@con_images'), - ('spmT_images', 'subject_level_analysis.@spmT_images'), - ('spm_mat_file', 'subject_level_analysis.@spm_mat_file')]) - ]) + subject_level_analysis.connect( + model_estimate, 'spm_mat_file', contrast_estimate, 'spm_mat_file') + subject_level_analysis.connect( + model_estimate, 'beta_images', contrast_estimate, 'beta_images') + subject_level_analysis.connect( + model_estimate, 'residual_image', contrast_estimate,'residual_image') + + # DATASINK - store the wanted results in the wanted repository + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + subject_level_analysis.connect( + contrast_estimate, 'con_images', data_sink, 'subject_level_analysis.@con_images') + subject_level_analysis.connect( + contrast_estimate, 'spmT_images', data_sink, 'subject_level_analysis.@spmT_images') + subject_level_analysis.connect( + contrast_estimate, 'spm_mat_file', data_sink, 'subject_level_analysis.@spm_mat_file') return subject_level_analysis @@ -661,26 +689,28 @@ def get_group_level_analysis_sub_workflow(self, method): # Compute the number of participants used to do the analysis nb_subjects = len(self.subject_list) - # Infosource - a function free node to iterate over the list of subject names + # Init workflow + group_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}') + + # IDENTITY INTERFACE - iterate over the list of contrasts information_source = Node( IdentityInterface( fields = ['contrast_id', 'subjects']), name = 'information_source') information_source.iterables = [('contrast_id', self.contrast_list)] - # SelectFiles + # SELECT FILES - get contrast files from subject-level output templates = { 'contrasts' : join(self.directories.output_dir, 'subject_level_analysis', '_subject_id_*', 'con_{contrast_id}.nii') } - select_files = Node(SelectFiles(templates), name = 'select_files') select_files.inputs.base_directory = self.directories.results_dir select_files.inputs.force_list = True - - # Datasink - save important files - data_sink = Node(DataSink(), name = 'data_sink') - data_sink.inputs.base_directory = self.directories.output_dir + group_level_analysis.connect( + information_source, 'contrast_id', select_files, 'contrast_id') # Function Node get_equal_range_subjects # Get subjects in the equalRange group and in the subject_list @@ -722,16 +752,23 @@ def get_group_level_analysis_sub_workflow(self, method): ), name = 'get_contrasts', iterfield = 'input_str' ) + group_level_analysis.connect(select_files, 'contrasts', get_contrasts, 'input_str') - # Estimate model + # ESTIMATE MODEL (inputs will be set later, depending on the method) estimate_model = Node(EstimateModel(), name = 'estimate_model') estimate_model.inputs.estimation_method = {'Classical':1} - # Estimate contrasts + # ESTIMATE CONTRAST estimate_contrast = Node(EstimateContrast(), name = 'estimate_contrast') estimate_contrast.inputs.group_contrast = True - - # Create thresholded maps + group_level_analysis.connect( + estimate_model, 'spm_mat_file', estimate_contrast, 'spm_mat_file') + group_level_analysis.connect( + estimate_model, 'residual_image', estimate_contrast, 'residual_image') + group_level_analysis.connect( + estimate_model, 'beta_images', estimate_contrast, 'beta_images') + + # THRESHOLD - Create thresholded maps threshold = MapNode(Threshold(), name = 'threshold', iterfield = ['stat_image', 'contrast_index']) threshold.inputs.use_fwe_correction = False @@ -740,28 +777,8 @@ def get_group_level_analysis_sub_workflow(self, method): threshold.inputs.use_topo_fdr = False threshold.inputs.force_activation = True threshold.synchronize = True - - group_level_analysis = Workflow( - base_dir = self.directories.working_dir, - name = f'group_level_analysis_{method}_nsub_{nb_subjects}') - group_level_analysis.connect([ - (information_source, select_files, [('contrast_id', 'contrast_id')]), - (select_files, get_contrasts, [('contrasts', 'input_str')]), - (estimate_model, estimate_contrast, [ - ('spm_mat_file', 'spm_mat_file'), - ('residual_image', 'residual_image'), - ('beta_images', 'beta_images')]), - (estimate_contrast, threshold, [ - ('spm_mat_file', 'spm_mat_file'), - ('spmT_images', 'stat_image')]), - (threshold, data_sink, [ - ('thresholded_map', f'group_level_analysis_{method}_nsub_{nb_subjects}.@thresh')]), - (estimate_model, data_sink, [ - ('mask_image', f'group_level_analysis_{method}_nsub_{nb_subjects}.@mask')]), - (estimate_contrast, data_sink, [ - ('spm_mat_file', f'group_level_analysis_{method}_nsub_{nb_subjects}.@spm_mat'), - ('spmT_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@T'), - ('con_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@con')])]) + group_level_analysis.connect(estimate_contrast, 'spm_mat_file', threshold, 'spm_mat_file') + group_level_analysis.connect(estimate_contrast, 'spmT_images', threshold, 'stat_image') if method in ('equalRange', 'equalIndifference'): estimate_contrast.inputs.contrasts = [ @@ -769,18 +786,15 @@ def get_group_level_analysis_sub_workflow(self, method): ('Group', 'T', ['mean'], [-1]) ] - threshold.inputs.contrast_index = [1, 2] - - # Specify design matrix + # ONE SAMPLE T TEST DESIGN - Create the design matrix one_sample_t_test_design = Node(OneSampleTTestDesign(), name = 'one_sample_t_test_design') + group_level_analysis.connect( + get_contrasts, ('out_list', clean_list), one_sample_t_test_design, 'in_files') + group_level_analysis.connect( + one_sample_t_test_design, 'spm_mat_file', estimate_model, 'spm_mat_file') - group_level_analysis.connect([ - (get_contrasts, one_sample_t_test_design, [ - (('out_list', clean_list), 'in_files') - ]), - (one_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) - ]) + threshold.inputs.contrast_index = [1, 2] if method == 'equalRange': group_level_analysis.connect( @@ -795,12 +809,14 @@ def get_group_level_analysis_sub_workflow(self, method): ) elif method == 'groupComp': + group_level_analysis.connect( + get_equal_range_subjects, ('out_list', complete_subject_ids), + get_contrasts, 'elements') + estimate_contrast.inputs.contrasts = [ ('Eq range vs Eq indiff in loss', 'T', ['Group_{1}', 'Group_{2}'], [1, -1]) ] - threshold.inputs.contrast_index = [1] - # Function Node elements_in_string # Get contrast files for required subjects # Note : using a MapNode with elements_in_string requires using clean_list to remove @@ -812,27 +828,43 @@ def get_group_level_analysis_sub_workflow(self, method): ), name = 'get_contrasts_2', iterfield = 'input_str' ) + group_level_analysis.connect(select_files, 'contrasts', get_contrasts_2, 'input_str') + group_level_analysis.connect( + get_equal_indifference_subjects, ('out_list', complete_subject_ids), + get_contrasts_2, 'elements') - # Node for the design matrix + # TWO SAMPLE T TEST DESIGN - Create the design matrix two_sample_t_test_design = Node(TwoSampleTTestDesign(), name = 'two_sample_t_test_design') + group_level_analysis.connect( + get_contrasts, ('out_list', clean_list), + two_sample_t_test_design, 'group1_files') + group_level_analysis.connect( + get_contrasts_2, ('out_list', clean_list), + two_sample_t_test_design, 'group2_files') + group_level_analysis.connect( + two_sample_t_test_design, 'spm_mat_file', estimate_model, 'spm_mat_file') - group_level_analysis.connect([ - (select_files, get_contrasts_2, [('contrasts', 'input_str')]), - (get_equal_range_subjects, get_contrasts, [ - (('out_list', complete_subject_ids), 'elements') - ]), - (get_equal_indifference_subjects, get_contrasts_2, [ - (('out_list', complete_subject_ids), 'elements') - ]), - (get_contrasts, two_sample_t_test_design, [ - (('out_list', clean_list), 'group1_files') - ]), - (get_contrasts_2, two_sample_t_test_design, [ - (('out_list', clean_list), 'group2_files') - ]), - (two_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) - ]) + threshold.inputs.contrast_index = [1] + + # Datasink - save important files + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + group_level_analysis.connect( + threshold, 'thresholded_map', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@thresh') + group_level_analysis.connect( + estimate_model, 'mask_image', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@mask') + group_level_analysis.connect( + estimate_contrast, 'spm_mat_file', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@spm_mat') + group_level_analysis.connect( + estimate_contrast, 'spmT_images', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@T') + group_level_analysis.connect( + estimate_contrast, 'con_images', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@con') return group_level_analysis diff --git a/tests/pipelines/test_team_98BT.py b/tests/pipelines/test_team_98BT.py index ad09d2bf..95024642 100644 --- a/tests/pipelines/test_team_98BT.py +++ b/tests/pipelines/test_team_98BT.py @@ -55,11 +55,11 @@ def test_outputs(): pipeline = PipelineTeam98BT() # 1 - 1 subject outputs pipeline.subject_list = ['001'] - helpers.test_pipeline_outputs(pipeline, [1 + 1*1 + 4*1*5,0,9,84,18]) + helpers.test_pipeline_outputs(pipeline, [1 + 1*1 + 4*1*4,0,9,84,18]) # 2 - 4 subjects outputs pipeline.subject_list = ['001', '002', '003', '004'] - helpers.test_pipeline_outputs(pipeline, [1 + 4*1 + 4*4*5,0,36,84,18]) + helpers.test_pipeline_outputs(pipeline, [1 + 4*1 + 4*4*4,0,36,84,18]) @staticmethod @mark.unit_test @@ -135,7 +135,7 @@ def test_subject_information(): assert bunch.regressors is None pmod = bunch.pmod[0] assert isinstance(pmod, Bunch) - assert pmod.name == ['gain_run1', 'loss_run1', 'answers_run1'] + assert pmod.name == ['gain', 'loss', 'answers'] assert pmod.poly == [1, 1, 1] helpers.compare_float_2d_arrays(pmod.param, [ [14.0, 34.0, 38.0, 10.0, 16.0],