Skip to content

Commit

Permalink
Keep working on config.
Browse files Browse the repository at this point in the history
  • Loading branch information
tsalo committed Oct 6, 2023
1 parent ab887b2 commit 4a9a93c
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 128 deletions.
20 changes: 12 additions & 8 deletions xcp_d/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ def _validate_parameters():
# Check the validity of inputs
if config.execution.output_dir == config.execution.fmri_dir:
rec_path = (
config.execution.fmri_dir / "derivatives" /
f"xcp_d-{config.environment.version.split('+')[0]}"
config.execution.fmri_dir
/ "derivatives"
/ f"xcp_d-{config.environment.version.split('+')[0]}"
)
config.loggers.cli.error(
"The selected output folder is the same as the input fmri input. "
Expand Down Expand Up @@ -158,19 +159,16 @@ def _validate_parameters():
return return_code


def main(args=None, namespace=None):
def main():
"""Run the main workflow."""
import gc
import sys
from multiprocessing import Manager, Process
from os import EX_SOFTWARE
from pathlib import Path

from xcp_d.utils.bids import write_derivative_description

from multiprocessing import Manager, Process

from xcp_d.cli.parser import parse_args
from xcp_d.utils.bids import write_derivative_description

parse_args()

Expand Down Expand Up @@ -268,8 +266,10 @@ def main(args=None, namespace=None):

if "Workflow did not execute cleanly" not in str(e):
sentry_sdk.capture_exception(e)

config.loggers.workflow.critical("XCP-D failed: %s", e)
raise

else:
config.loggers.workflow.log(25, "XCP-D finished successfully!")
if not config.execution.notrack:
Expand All @@ -293,7 +293,9 @@ def main(args=None, namespace=None):
"Works derived from this XCP-D execution should include the "
f"boilerplate text found in {boiler_file}.",
)

errno = 0

finally:
from niworkflows.reports.core import generate_reports
from pkg_resources import resource_filename as pkgrf
Expand All @@ -307,14 +309,16 @@ def main(args=None, namespace=None):
packagename="xcp_d",
)
write_derivative_description(
config.execution.bids_dir, config.execution.output_dir / "xcp_d"
config.execution.bids_dir,
config.execution.output_dir / "xcp_d",
)

if failed_reports and not config.execution.notrack:
sentry_sdk.capture_message(
f"Report generation failed for {failed_reports} subjects",
level="error",
)

sys.exit(int((errno + failed_reports) > 0))


Expand Down
246 changes: 126 additions & 120 deletions xcp_d/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,23 @@ def build_workflow(config_file, retval):
output_dir = config.execution.output_dir
version = config.environment.version

if config.execution.clean_workdir:
from niworkflows.utils.misc import clean_directory

build_log.info(f"Clearing previous xcp_d working directory: {config.execution.work_dir}")
if not clean_directory(config.execution.work_dir):
build_log.warning(
f"Could not clear all contents of working directory: {config.execution.work_dir}"
)

retval["return_code"] = 1
retval["workflow"] = None
retval["fmri_dir"] = str(config.execution.fmri_dir)
retval["output_dir"] = str(config.execution.output_dir)
retval["work_dir"] = str(config.execution.work_dir)

# First check that fmriprep_dir looks like a BIDS folder
# warn if older results exist: check for dataset_description.json in output folder
msg = check_pipeline_version(version, output_dir / "xcp_d" / "dataset_description.json")
if msg is not None:
build_log.warning(msg)

# Please note this is the input folder's dataset_description.json
dset_desc_path = config.execution.bids_dir / "dataset_description.json"
if dset_desc_path.exists():
from hashlib import sha256

desc_content = dset_desc_path.read_bytes()
config.execution.bids_description_hash = sha256(desc_content).hexdigest()

# First check that fmri_dir looks like a BIDS folder
if config.workflow.input_type in ("dcan", "hcp"):
if config.workflow.input_type == "dcan":
from xcp_d.utils.dcan2fmriprep import convert_dcan2bids as convert_to_bids
Expand Down Expand Up @@ -72,129 +73,134 @@ def build_workflow(config_file, retval):
)
retval["return_code"] = 1

# Set up some instrumental utilities
run_uuid = f"{strftime('%Y%m%d-%H%M%S')}_{uuid.uuid4()}"
retval["run_uuid"] = run_uuid

layout = BIDSLayout(str(config.execution.fmri_dir), validate=False, derivatives=True)
subject_list = collect_participants(
layout,
config.execution.layout,
participant_label=config.execution.participant_label,
)
retval["subject_list"] = subject_list

# Load base plugin_settings from file if --use-plugin
if config.nipype.plugin is not None:
from yaml import load as loadyml

with open(config.nipype.plugin) as f:
plugin_settings = loadyml(f)

plugin_settings.setdefault("plugin_args", {})

else:
# Defaults
plugin_settings = {
"plugin": "MultiProc",
"plugin_args": {
"raise_insufficient": False,
"maxtasksperchild": 1,
},
}

# Permit overriding plugin config with specific CLI options
nprocs = config.nipype.nprocs
omp_nthreads = config.nipype.omp_nthreads

if (nprocs == 1) or (omp_nthreads > nprocs):
omp_nthreads = 1

plugin_settings["plugin_args"]["n_procs"] = nprocs
# Called with reports only
if config.execution.reports_only:
from pkg_resources import resource_filename as pkgrf

build_log.log(25, f"Running --reports-only on participants {', '.join(subject_list)}")
retval["return_code"] = generate_reports(
subject_list,
config.execution.output_dir,
config.execution.run_uuid,
config=pkgrf("xcp_d", "data/reports-spec.yml"),
packagename="xcp_d",
)
return retval

if 1 < nprocs < omp_nthreads:
build_log.warning(
f"Per-process threads (--omp-nprocs={omp_nthreads}) exceed total "
f"threads (--nprocs/--n_cpus={nprocs})"
# Build main workflow
init_msg = f"""
Running XCP-D version {config.environment.version}:
* BIDS dataset path: {config.execution.bids_dir}.
* Participant list: {subject_list}.
* Run identifier: {config.execution.run_uuid}.
* Output spaces: {config.execution.output_spaces}."""

if config.execution.anat_derivatives:
init_msg += f"""
* Anatomical derivatives: {config.execution.anat_derivatives}."""
build_log.log(25, init_msg)

retval["workflow"] = init_xcpd_wf()

# Check for FS license after building the workflow
if not check_valid_fs_license():
build_log.critical(
"""\
ERROR: a valid license file is required for FreeSurfer to run. XCP-D looked for an existing \
license file at several paths, in this order: 1) command line argument ``--fs-license-file``; \
2) ``$FS_LICENSE`` environment variable; and 3) the ``$FREESURFER_HOME/license.txt`` path. Get it \
(for free) by registering at https://surfer.nmr.mgh.harvard.edu/registration.html"""
)
retval["return_code"] = 126 # 126 == Command invoked cannot execute.
return retval

# Check workflow for missing commands
missing = check_deps(retval["workflow"])
if missing:
build_log.critical(
"Cannot run XCP-D. Missing dependencies:%s",
"\n\t* ".join([""] + [f"{cmd} (Interface: {iface})" for iface, cmd in missing]),
)
retval["return_code"] = 127 # 127 == command not found.
return retval

if config.nipype.memory_gb:
plugin_settings["plugin_args"]["memory_gb"] = config.nipype.memory_gb

retval["plugin_settings"] = plugin_settings

# Set up directories
log_dir = config.execution.output_dir / "xcp_d" / "logs"

# Check and create output and working directories
config.execution.output_dir.mkdir(exist_ok=True, parents=True)
config.execution.work_dir.mkdir(exist_ok=True, parents=True)
log_dir.mkdir(exist_ok=True, parents=True)

# Nipype config (logs and execution)
ncfg.update_config(
{
"logging": {
"log_directory": str(log_dir),
"log_to_file": True,
"workflow_level": log_level,
"interface_level": log_level,
"utils_level": log_level,
},
"execution": {
"crashdump_dir": str(log_dir),
"crashfile_format": "txt",
"get_linked_libs": False,
},
"monitoring": {
"enabled": config.nipype.resource_monitor,
"sample_frequency": "0.5",
"summary_append": True,
},
}
config.to_filename(config_file)
build_log.info(
"XCP-D workflow graph with %d nodes built successfully.",
len(retval["workflow"]._get_all_nodes()),
)
retval["return_code"] = 0
return retval

if config.nipype.resource_monitor:
ncfg.enable_resource_monitor()

# Build main workflow
build_log.log(
25,
f"""\
Running xcp_d version {config.environment.version}:
* fMRI directory path: {config.execution.fmri_dir}.
* Participant list: {subject_list}.
* Run identifier: {run_uuid}.
""",
)

retval["workflow"] = init_xcpd_wf(
subject_list=subject_list,
name="xcpd_wf",
)
def build_boilerplate(config_file, workflow):
"""Write boilerplate in an isolated process."""
from xcp_d import config

boilerplate = retval["workflow"].visit_desc()
config.load(config_file)
logs_path = config.execution.output_dir / "xcp_d" / "logs"
boilerplate = workflow.visit_desc()
citation_files = {ext: logs_path / f"CITATION.{ext}" for ext in ("bib", "tex", "md", "html")}

if boilerplate:
citation_files = {ext: log_dir / f"CITATION.{ext}" for ext in ("bib", "tex", "md", "html")}
# To please git-annex users and also to guarantee consistency among different renderings
# of the same file, first remove any existing ones
# To please git-annex users and also to guarantee consistency
# among different renderings of the same file, first remove any
# existing one
for citation_file in citation_files.values():
try:
citation_file.unlink()
except FileNotFoundError:
pass

citation_files["md"].write_text(boilerplate)

build_log.log(
25,
(
"Works derived from this xcp_d execution should include the following boilerplate:\n\n"
f"{boilerplate}"
),
)

retval["return_code"] = 0

return retval
citation_files["md"].write_text(boilerplate)

if not config.execution.md_only_boilerplate and citation_files["md"].exists():
from shutil import copyfile
from subprocess import CalledProcessError, TimeoutExpired, check_call

from pkg_resources import resource_filename as pkgrf

# Generate HTML file resolving citations
cmd = [
"pandoc",
"-s",
"--bibliography",
pkgrf("xcp_d", "data/boilerplate.bib"),
"--filter",
"pandoc-citeproc",
"--metadata",
'pagetitle="XCP-D citation boilerplate"',
str(citation_files["md"]),
"-o",
str(citation_files["html"]),
]

config.loggers.cli.info("Generating an HTML version of the citation boilerplate...")
try:
check_call(cmd, timeout=10)
except (FileNotFoundError, CalledProcessError, TimeoutExpired):
config.loggers.cli.warning("Could not generate CITATION.html file:\n%s", " ".join(cmd))

# Generate LaTex file resolving citations
cmd = [
"pandoc",
"-s",
"--bibliography",
pkgrf("xcp_d", "data/boilerplate.bib"),
"--natbib",
str(citation_files["md"]),
"-o",
str(citation_files["tex"]),
]
config.loggers.cli.info("Generating a LaTeX version of the citation boilerplate...")
try:
check_call(cmd, timeout=10)
except (FileNotFoundError, CalledProcessError, TimeoutExpired):
config.loggers.cli.warning("Could not generate CITATION.tex file:\n%s", " ".join(cmd))
else:
copyfile(pkgrf("xcp_d", "data/boilerplate.bib"), citation_files["bib"])

0 comments on commit 4a9a93c

Please sign in to comment.