From ab887b2f3831cb2a2db6256c3ced2b6280804622 Mon Sep 17 00:00:00 2001 From: Taylor Salo Date: Fri, 6 Oct 2023 11:11:19 -0400 Subject: [PATCH] Keep working on config. --- xcp_d/cli/run.py | 432 +++++------------------------------------- xcp_d/cli/workflow.py | 80 ++++---- 2 files changed, 90 insertions(+), 422 deletions(-) diff --git a/xcp_d/cli/run.py b/xcp_d/cli/run.py index c519af012..14504e28d 100644 --- a/xcp_d/cli/run.py +++ b/xcp_d/cli/run.py @@ -32,7 +32,8 @@ def _validate_parameters(): # Check the validity of inputs if config.execution.output_dir == config.execution.fmri_dir: rec_path = ( - config.execution.fmri_dir / "derivatives" / f"xcp_d-{config.environment.version.split('+')[0]}" + config.execution.fmri_dir / "derivatives" / + f"xcp_d-{config.environment.version.split('+')[0]}" ) config.loggers.cli.error( "The selected output folder is the same as the input fmri input. " @@ -41,29 +42,31 @@ def _validate_parameters(): ) return_code = 1 - if opts.analysis_level != "participant": + if config.workflow.analysis_level != "participant": config.loggers.cli.error('Please select analysis level "participant"') return_code = 1 # Bandpass filter parameters - if opts.lower_bpf <= 0 and opts.upper_bpf <= 0: - opts.bandpass_filter = False + if config.workflow.lower_bpf <= 0 and config.workflow.upper_bpf <= 0: + config.workflow.bandpass_filter = False if ( - opts.bandpass_filter - and (opts.lower_bpf >= opts.upper_bpf) - and (opts.lower_bpf > 0 and opts.upper_bpf > 0) + config.workflow.bandpass_filter + and (config.workflow.lower_bpf >= config.workflow.upper_bpf) + and (config.workflow.lower_bpf > 0 and config.workflow.upper_bpf > 0) ): config.loggers.cli.error( - f"'--lower-bpf' ({opts.lower_bpf}) must be lower than " - f"'--upper-bpf' ({opts.upper_bpf})." + f"'--lower-bpf' ({config.workflow.lower_bpf}) must be lower than " + f"'--upper-bpf' ({config.workflow.upper_bpf})." ) return_code = 1 - elif not opts.bandpass_filter: - config.loggers.cli.warning("Bandpass filtering is disabled. ALFF outputs will not be generated.") + elif not config.workflow.bandpass_filter: + config.loggers.cli.warning( + "Bandpass filtering is disabled. ALFF outputs will not be generated." + ) # Scrubbing parameters - if opts.fd_thresh <= 0: + if config.workflow.fd_thresh <= 0: ignored_params = "\n\t".join( [ "--min-time", @@ -78,79 +81,81 @@ def _validate_parameters(): "Framewise displacement-based scrubbing is disabled. " f"The following parameters will have no effect:\n\t{ignored_params}" ) - opts.min_time = 0 - opts.motion_filter_type = None - opts.band_stop_min = None - opts.band_stop_max = None - opts.motion_filter_order = None + config.workflow.min_time = 0 + config.workflow.motion_filter_type = None + config.workflow.band_stop_min = None + config.workflow.band_stop_max = None + config.workflow.motion_filter_order = None # Motion filtering parameters - if opts.motion_filter_type == "notch": - if not (opts.band_stop_min and opts.band_stop_max): + if config.workflow.motion_filter_type == "notch": + if not (config.workflow.band_stop_min and config.workflow.band_stop_max): config.loggers.cli.error( "Please set both '--band-stop-min' and '--band-stop-max' if you want to apply " "the 'notch' motion filter." ) return_code = 1 - elif opts.band_stop_min >= opts.band_stop_max: + elif config.workflow.band_stop_min >= config.workflow.band_stop_max: config.loggers.cli.error( - f"'--band-stop-min' ({opts.band_stop_min}) must be lower than " - f"'--band-stop-max' ({opts.band_stop_max})." + f"'--band-stop-min' ({config.workflow.band_stop_min}) must be lower than " + f"'--band-stop-max' ({config.workflow.band_stop_max})." ) return_code = 1 - elif opts.band_stop_min < 1 or opts.band_stop_max < 1: + elif config.workflow.band_stop_min < 1 or config.workflow.band_stop_max < 1: config.loggers.cli.warning( - f"Either '--band-stop-min' ({opts.band_stop_min}) or " - f"'--band-stop-max' ({opts.band_stop_max}) is suspiciously low. " + f"Either '--band-stop-min' ({config.workflow.band_stop_min}) or " + f"'--band-stop-max' ({config.workflow.band_stop_max}) is suspiciously low. " "Please remember that these values should be in breaths-per-minute." ) - elif opts.motion_filter_type == "lp": - if not opts.band_stop_min: + elif config.workflow.motion_filter_type == "lp": + if not config.workflow.band_stop_min: config.loggers.cli.error( "Please set '--band-stop-min' if you want to apply the 'lp' motion filter." ) return_code = 1 - elif opts.band_stop_min < 1: + elif config.workflow.band_stop_min < 1: config.loggers.cli.warning( - f"'--band-stop-min' ({opts.band_stop_max}) is suspiciously low. " + f"'--band-stop-min' ({config.workflow.band_stop_max}) is suspiciously low. " "Please remember that this value should be in breaths-per-minute." ) - if opts.band_stop_max: - config.loggers.cli.warning("'--band-stop-max' is ignored when '--motion-filter-type' is 'lp'.") + if config.workflow.band_stop_max: + config.loggers.cli.warning( + "'--band-stop-max' is ignored when '--motion-filter-type' is 'lp'." + ) - elif opts.band_stop_min or opts.band_stop_max: + elif config.workflow.band_stop_min or config.workflow.band_stop_max: config.loggers.cli.warning( "'--band-stop-min' and '--band-stop-max' are ignored if '--motion-filter-type' " "is not set." ) # Some parameters are automatically set depending on the input type. - if opts.input_type in ("dcan", "hcp"): - if not opts.cifti: + if config.workflow.input_type in ("dcan", "hcp"): + if not config.workflow.cifti: config.loggers.cli.warning( - f"With input_type {opts.input_type}, cifti processing (--cifti) will be " - "enabled automatically." + f"With input_type {config.workflow.input_type}, " + "cifti processing (--cifti) will be enabled automatically." ) - opts.cifti = True + config.workflow.cifti = True - if not opts.process_surfaces: + if not config.workflow.process_surfaces: config.loggers.cli.warning( - f"With input_type {opts.input_type}, surface normalization " + f"With input_type {config.workflow.input_type}, surface normalization " "(--warp-surfaces-native2std) will be enabled automatically." ) - opts.process_surfaces = True + config.workflow.process_surfaces = True # process_surfaces and nifti processing are incompatible. - if opts.process_surfaces and not opts.cifti: + if config.workflow.process_surfaces and not config.workflow.cifti: config.loggers.cli.error( "In order to perform surface normalization (--warp-surfaces-native2std), " "you must enable cifti processing (--cifti)." ) return_code = 1 - return opts, return_code + return return_code def main(args=None, namespace=None): @@ -313,349 +318,6 @@ def main(args=None, namespace=None): sys.exit(int((errno + failed_reports) > 0)) -def _validate_parameters(opts, config.loggers.cli): - """Validate parameters. - - This function was abstracted out of build_workflow to make testing easier. - """ - opts.fmri_dir = opts.fmri_dir.resolve() - opts.output_dir = opts.output_dir.resolve() - opts.work_dir = opts.work_dir.resolve() - - return_code = 0 - - # Set the FreeSurfer license - if opts.fs_license_file is not None: - opts.fs_license_file = opts.fs_license_file.resolve() - if opts.fs_license_file.is_file(): - os.environ["FS_LICENSE"] = str(opts.fs_license_file) - - else: - config.loggers.cli.error(f"Freesurfer license DNE: {opts.fs_license_file}.") - return_code = 1 - - # Check the validity of inputs - if opts.output_dir == opts.fmri_dir: - rec_path = ( - opts.fmri_dir / "derivatives" / f"xcp_d-{config.environment.version.split('+')[0]}" - ) - config.loggers.cli.error( - "The selected output folder is the same as the input fmri input. " - "Please modify the output path " - f"(suggestion: {rec_path})." - ) - return_code = 1 - - if opts.analysis_level != "participant": - config.loggers.cli.error('Please select analysis level "participant"') - return_code = 1 - - # Bandpass filter parameters - if opts.lower_bpf <= 0 and opts.upper_bpf <= 0: - opts.bandpass_filter = False - - if ( - opts.bandpass_filter - and (opts.lower_bpf >= opts.upper_bpf) - and (opts.lower_bpf > 0 and opts.upper_bpf > 0) - ): - config.loggers.cli.error( - f"'--lower-bpf' ({opts.lower_bpf}) must be lower than " - f"'--upper-bpf' ({opts.upper_bpf})." - ) - return_code = 1 - elif not opts.bandpass_filter: - config.loggers.cli.warning("Bandpass filtering is disabled. ALFF outputs will not be generated.") - - # Scrubbing parameters - if opts.fd_thresh <= 0: - ignored_params = "\n\t".join( - [ - "--min-time", - "--motion-filter-type", - "--band-stop-min", - "--band-stop-max", - "--motion-filter-order", - "--head_radius", - ] - ) - config.loggers.cli.warning( - "Framewise displacement-based scrubbing is disabled. " - f"The following parameters will have no effect:\n\t{ignored_params}" - ) - opts.min_time = 0 - opts.motion_filter_type = None - opts.band_stop_min = None - opts.band_stop_max = None - opts.motion_filter_order = None - - # Motion filtering parameters - if opts.motion_filter_type == "notch": - if not (opts.band_stop_min and opts.band_stop_max): - config.loggers.cli.error( - "Please set both '--band-stop-min' and '--band-stop-max' if you want to apply " - "the 'notch' motion filter." - ) - return_code = 1 - elif opts.band_stop_min >= opts.band_stop_max: - config.loggers.cli.error( - f"'--band-stop-min' ({opts.band_stop_min}) must be lower than " - f"'--band-stop-max' ({opts.band_stop_max})." - ) - return_code = 1 - elif opts.band_stop_min < 1 or opts.band_stop_max < 1: - config.loggers.cli.warning( - f"Either '--band-stop-min' ({opts.band_stop_min}) or " - f"'--band-stop-max' ({opts.band_stop_max}) is suspiciously low. " - "Please remember that these values should be in breaths-per-minute." - ) - - elif opts.motion_filter_type == "lp": - if not opts.band_stop_min: - config.loggers.cli.error( - "Please set '--band-stop-min' if you want to apply the 'lp' motion filter." - ) - return_code = 1 - elif opts.band_stop_min < 1: - config.loggers.cli.warning( - f"'--band-stop-min' ({opts.band_stop_max}) is suspiciously low. " - "Please remember that this value should be in breaths-per-minute." - ) - - if opts.band_stop_max: - config.loggers.cli.warning("'--band-stop-max' is ignored when '--motion-filter-type' is 'lp'.") - - elif opts.band_stop_min or opts.band_stop_max: - config.loggers.cli.warning( - "'--band-stop-min' and '--band-stop-max' are ignored if '--motion-filter-type' " - "is not set." - ) - - # Some parameters are automatically set depending on the input type. - if opts.input_type in ("dcan", "hcp"): - if not opts.cifti: - config.loggers.cli.warning( - f"With input_type {opts.input_type}, cifti processing (--cifti) will be " - "enabled automatically." - ) - opts.cifti = True - - if not opts.process_surfaces: - config.loggers.cli.warning( - f"With input_type {opts.input_type}, surface normalization " - "(--warp-surfaces-native2std) will be enabled automatically." - ) - opts.process_surfaces = True - - # process_surfaces and nifti processing are incompatible. - if opts.process_surfaces and not opts.cifti: - config.loggers.cli.error( - "In order to perform surface normalization (--warp-surfaces-native2std), " - "you must enable cifti processing (--cifti)." - ) - return_code = 1 - - return opts, return_code - - -def build_workflow(opts, retval): - """Create the Nipype workflow that supports the whole execution graph, given the inputs. - - All the checks and the construction of the workflow are done - inside this function that has pickleable inputs and output - dictionary (``retval``) to allow isolation using a - ``multiprocessing.Process`` that allows fmriprep to enforce - a hard-limited memory-scope. - """ - from bids import BIDSLayout - from nipype import config as ncfg - from nipype import logging as nlogging - - from xcp_d.utils.bids import collect_participants - from xcp_d.workflows.base import init_xcpd_wf - - log_level = int(max(25 - 5 * opts.verbose_count, logging.DEBUG)) - - config.loggers.cli = nlogging.getLogger("nipype.workflow") - config.loggers.cli.setLevel(log_level) - nlogging.getLogger("nipype.interface").setLevel(log_level) - nlogging.getLogger("nipype.utils").setLevel(log_level) - - opts, retval["return_code"] = _validate_parameters(opts, config.loggers.cli) - - if retval["return_code"] == 1: - return retval - - if opts.clean_workdir: - from niworkflows.utils.misc import clean_directory - - config.loggers.cli.info(f"Clearing previous xcp_d working directory: {opts.work_dir}") - if not clean_directory(opts.work_dir): - config.loggers.cli.warning( - f"Could not clear all contents of working directory: {opts.work_dir}" - ) - - retval["return_code"] = 1 - retval["workflow"] = None - retval["fmri_dir"] = str(opts.fmri_dir) - retval["output_dir"] = str(opts.output_dir) - retval["work_dir"] = str(opts.work_dir) - - # First check that fmriprep_dir looks like a BIDS folder - if opts.input_type in ("dcan", "hcp"): - if opts.input_type == "dcan": - from xcp_d.utils.dcan2fmriprep import convert_dcan2bids as convert_to_bids - elif opts.input_type == "hcp": - from xcp_d.utils.hcp2fmriprep import convert_hcp2bids as convert_to_bids - - NIWORKFLOWS_LOG.info(f"Converting {opts.input_type} to fmriprep format") - converted_fmri_dir = os.path.join( - opts.work_dir, - f"dset_bids/derivatives/{opts.input_type}", - ) - os.makedirs(converted_fmri_dir, exist_ok=True) - - convert_to_bids( - opts.fmri_dir, - out_dir=converted_fmri_dir, - participant_ids=opts.participant_label, - ) - - opts.fmri_dir = Path(converted_fmri_dir) - - if not os.path.isfile((os.path.join(opts.fmri_dir, "dataset_description.json"))): - config.loggers.cli.error( - "No dataset_description.json file found in input directory. " - "Make sure to point to the specific pipeline's derivatives folder. " - "For example, use '/dset/derivatives/fmriprep', not /dset/derivatives'." - ) - retval["return_code"] = 1 - - # Set up some instrumental utilities - run_uuid = f"{strftime('%Y%m%d-%H%M%S')}_{uuid.uuid4()}" - retval["run_uuid"] = run_uuid - - layout = BIDSLayout(str(opts.fmri_dir), validate=False, derivatives=True) - subject_list = collect_participants(layout, participant_label=opts.participant_label) - retval["subject_list"] = subject_list - - # Load base plugin_settings from file if --use-plugin - if opts.use_plugin is not None: - from yaml import load as loadyml - - with open(opts.use_plugin) as f: - plugin_settings = loadyml(f) - - plugin_settings.setdefault("plugin_args", {}) - - else: - # Defaults - plugin_settings = { - "plugin": "MultiProc", - "plugin_args": { - "raise_insufficient": False, - "maxtasksperchild": 1, - }, - } - - # Permit overriding plugin config with specific CLI options - nthreads = opts.nthreads - omp_nthreads = opts.omp_nthreads - - if (nthreads == 1) or (omp_nthreads > nthreads): - omp_nthreads = 1 - - plugin_settings["plugin_args"]["n_procs"] = nthreads - - if 1 < nthreads < omp_nthreads: - config.loggers.cli.warning( - f"Per-process threads (--omp-nthreads={omp_nthreads}) exceed total " - f"threads (--nthreads/--n_cpus={nthreads})" - ) - - if opts.mem_gb: - plugin_settings["plugin_args"]["memory_gb"] = opts.mem_gb - - retval["plugin_settings"] = plugin_settings - - # Set up directories - log_dir = opts.output_dir / "xcp_d" / "logs" - - # Check and create output and working directories - opts.output_dir.mkdir(exist_ok=True, parents=True) - opts.work_dir.mkdir(exist_ok=True, parents=True) - log_dir.mkdir(exist_ok=True, parents=True) - - # Nipype config (logs and execution) - ncfg.update_config( - { - "logging": { - "log_directory": str(log_dir), - "log_to_file": True, - "workflow_level": log_level, - "interface_level": log_level, - "utils_level": log_level, - }, - "execution": { - "crashdump_dir": str(log_dir), - "crashfile_format": "txt", - "get_linked_libs": False, - }, - "monitoring": { - "enabled": opts.resource_monitor, - "sample_frequency": "0.5", - "summary_append": True, - }, - } - ) - - if opts.resource_monitor: - ncfg.enable_resource_monitor() - - # Build main workflow - config.loggers.cli.log( - 25, - f"""\ -Running xcp_d version {config.environment.version}: - * fMRI directory path: {opts.fmri_dir}. - * Participant list: {subject_list}. - * Run identifier: {run_uuid}. - -""", - ) - - retval["workflow"] = init_xcpd_wf( - subject_list=subject_list, - name="xcpd_wf", - ) - - boilerplate = retval["workflow"].visit_desc() - - if boilerplate: - citation_files = {ext: log_dir / f"CITATION.{ext}" for ext in ("bib", "tex", "md", "html")} - # To please git-annex users and also to guarantee consistency among different renderings - # of the same file, first remove any existing ones - for citation_file in citation_files.values(): - try: - citation_file.unlink() - except FileNotFoundError: - pass - - citation_files["md"].write_text(boilerplate) - - config.loggers.cli.log( - 25, - ( - "Works derived from this xcp_d execution should include the following boilerplate:\n\n" - f"{boilerplate}" - ), - ) - - retval["return_code"] = 0 - - return retval - - if __name__ == "__main__": raise RuntimeError( "xcp_d/cli/run.py should not be run directly;\n" diff --git a/xcp_d/cli/workflow.py b/xcp_d/cli/workflow.py index 70c3a9b75..778ffaa5c 100644 --- a/xcp_d/cli/workflow.py +++ b/xcp_d/cli/workflow.py @@ -10,6 +10,9 @@ def build_workflow(config_file, retval): ``multiprocessing.Process`` that allows fmriprep to enforce a hard-limited memory-scope. """ + import os + from pathlib import Path + from niworkflows.reports.core import generate_reports from niworkflows.utils.bids import check_pipeline_version, collect_participants from niworkflows.utils.misc import check_valid_fs_license @@ -24,44 +27,44 @@ def build_workflow(config_file, retval): output_dir = config.execution.output_dir version = config.environment.version - if opts.clean_workdir: + if config.execution.clean_workdir: from niworkflows.utils.misc import clean_directory - build_log.info(f"Clearing previous xcp_d working directory: {opts.work_dir}") - if not clean_directory(opts.work_dir): + build_log.info(f"Clearing previous xcp_d working directory: {config.execution.work_dir}") + if not clean_directory(config.execution.work_dir): build_log.warning( - f"Could not clear all contents of working directory: {opts.work_dir}" + f"Could not clear all contents of working directory: {config.execution.work_dir}" ) retval["return_code"] = 1 retval["workflow"] = None - retval["fmri_dir"] = str(opts.fmri_dir) - retval["output_dir"] = str(opts.output_dir) - retval["work_dir"] = str(opts.work_dir) + retval["fmri_dir"] = str(config.execution.fmri_dir) + retval["output_dir"] = str(config.execution.output_dir) + retval["work_dir"] = str(config.execution.work_dir) # First check that fmriprep_dir looks like a BIDS folder - if opts.input_type in ("dcan", "hcp"): - if opts.input_type == "dcan": + if config.workflow.input_type in ("dcan", "hcp"): + if config.workflow.input_type == "dcan": from xcp_d.utils.dcan2fmriprep import convert_dcan2bids as convert_to_bids - elif opts.input_type == "hcp": + elif config.workflow.input_type == "hcp": from xcp_d.utils.hcp2fmriprep import convert_hcp2bids as convert_to_bids - NIWORKFLOWS_LOG.info(f"Converting {opts.input_type} to fmriprep format") + config.loggers.cli.info(f"Converting {config.workflow.input_type} to fmriprep format") converted_fmri_dir = os.path.join( - opts.work_dir, - f"dset_bids/derivatives/{opts.input_type}", + config.execution.work_dir, + f"dset_bids/derivatives/{config.workflow.input_type}", ) os.makedirs(converted_fmri_dir, exist_ok=True) convert_to_bids( - opts.fmri_dir, + config.execution.fmri_dir, out_dir=converted_fmri_dir, - participant_ids=opts.participant_label, + participant_ids=config.execution.participant_label, ) - opts.fmri_dir = Path(converted_fmri_dir) + config.execution.fmri_dir = Path(converted_fmri_dir) - if not os.path.isfile((os.path.join(opts.fmri_dir, "dataset_description.json"))): + if not os.path.isfile((os.path.join(config.execution.fmri_dir, "dataset_description.json"))): build_log.error( "No dataset_description.json file found in input directory. " "Make sure to point to the specific pipeline's derivatives folder. " @@ -73,15 +76,18 @@ def build_workflow(config_file, retval): run_uuid = f"{strftime('%Y%m%d-%H%M%S')}_{uuid.uuid4()}" retval["run_uuid"] = run_uuid - layout = BIDSLayout(str(opts.fmri_dir), validate=False, derivatives=True) - subject_list = collect_participants(layout, participant_label=opts.participant_label) + layout = BIDSLayout(str(config.execution.fmri_dir), validate=False, derivatives=True) + subject_list = collect_participants( + layout, + participant_label=config.execution.participant_label, + ) retval["subject_list"] = subject_list # Load base plugin_settings from file if --use-plugin - if opts.use_plugin is not None: + if config.nipype.plugin is not None: from yaml import load as loadyml - with open(opts.use_plugin) as f: + with open(config.nipype.plugin) as f: plugin_settings = loadyml(f) plugin_settings.setdefault("plugin_args", {}) @@ -97,31 +103,31 @@ def build_workflow(config_file, retval): } # Permit overriding plugin config with specific CLI options - nthreads = opts.nthreads - omp_nthreads = opts.omp_nthreads + nprocs = config.nipype.nprocs + omp_nthreads = config.nipype.omp_nthreads - if (nthreads == 1) or (omp_nthreads > nthreads): + if (nprocs == 1) or (omp_nthreads > nprocs): omp_nthreads = 1 - plugin_settings["plugin_args"]["n_procs"] = nthreads + plugin_settings["plugin_args"]["n_procs"] = nprocs - if 1 < nthreads < omp_nthreads: + if 1 < nprocs < omp_nthreads: build_log.warning( - f"Per-process threads (--omp-nthreads={omp_nthreads}) exceed total " - f"threads (--nthreads/--n_cpus={nthreads})" + f"Per-process threads (--omp-nprocs={omp_nthreads}) exceed total " + f"threads (--nprocs/--n_cpus={nprocs})" ) - if opts.mem_gb: - plugin_settings["plugin_args"]["memory_gb"] = opts.mem_gb + if config.nipype.memory_gb: + plugin_settings["plugin_args"]["memory_gb"] = config.nipype.memory_gb retval["plugin_settings"] = plugin_settings # Set up directories - log_dir = opts.output_dir / "xcp_d" / "logs" + log_dir = config.execution.output_dir / "xcp_d" / "logs" # Check and create output and working directories - opts.output_dir.mkdir(exist_ok=True, parents=True) - opts.work_dir.mkdir(exist_ok=True, parents=True) + config.execution.output_dir.mkdir(exist_ok=True, parents=True) + config.execution.work_dir.mkdir(exist_ok=True, parents=True) log_dir.mkdir(exist_ok=True, parents=True) # Nipype config (logs and execution) @@ -140,14 +146,14 @@ def build_workflow(config_file, retval): "get_linked_libs": False, }, "monitoring": { - "enabled": opts.resource_monitor, + "enabled": config.nipype.resource_monitor, "sample_frequency": "0.5", "summary_append": True, }, } ) - if opts.resource_monitor: + if config.nipype.resource_monitor: ncfg.enable_resource_monitor() # Build main workflow @@ -155,7 +161,7 @@ def build_workflow(config_file, retval): 25, f"""\ Running xcp_d version {config.environment.version}: - * fMRI directory path: {opts.fmri_dir}. + * fMRI directory path: {config.execution.fmri_dir}. * Participant list: {subject_list}. * Run identifier: {run_uuid}. @@ -191,4 +197,4 @@ def build_workflow(config_file, retval): retval["return_code"] = 0 - return retval \ No newline at end of file + return retval