diff --git a/xcp_d/config.py b/xcp_d/config.py index b0b3795e0..9ef3a57c0 100644 --- a/xcp_d/config.py +++ b/xcp_d/config.py @@ -1,20 +1,40 @@ # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- # vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# r""" A Python module to maintain unique, run-wide *XCP-D* settings. This module implements the memory structures to keep a consistent, singleton config. Settings are passed across processes via filesystem, and a copy of the settings for each run and subject is left under -``/sub-/log//xcp_d.toml``. +``/sub-/log//xcp_d.toml``. Settings are stored using :abbr:`ToML (Tom's Markup Language)`. -The module has a :py:func:`~xcp_d.config.to_filename` function to allow writting out +The module has a :py:func:`~xcp_d.config.to_filename` function to allow writing out the settings to hard disk in *ToML* format, which looks like: -.. literalinclude:: ../../xcp_d/tests/data/config.toml +.. literalinclude:: ../xcp_d/data/tests/config.toml :language: toml :name: xcp_d.toml - :caption: **Example file representation of xcp_d settings**. + :caption: **Example file representation of XCP-D settings**. This config file is used to pass the settings across processes, using the :py:func:`~xcp_d.config.load` function. @@ -43,11 +63,12 @@ config.to_filename(config_file) # Call build_workflow(config_file, retval) in a subprocess with Manager() as mgr: - from xcp_d.cli import build_workflow + from .workflow import build_workflow retval = mgr.dict() p = Process(target=build_workflow, args=(str(config_file), retval)) p.start() p.join() + config.load(config_file) # Access configs from any code section as: value = config.section.setting @@ -68,8 +89,16 @@ :py:class:`~bids.layout.BIDSLayout`, etc.) """ +import os from multiprocessing import set_start_method +# Disable NiPype etelemetry always +_disable_et = bool(os.getenv("NO_ET") is not None or os.getenv("NIPYPE_NO_ET") is not None) +os.environ["NIPYPE_NO_ET"] = "1" +os.environ["NO_ET"] = "1" + +CONFIG_FILENAME = "xcp_d.toml" + try: set_start_method("forkserver") except RuntimeError: @@ -77,7 +106,6 @@ finally: # Defer all custom import for after initializing the forkserver and # ignoring the most annoying warnings - import os import random import sys from pathlib import Path @@ -85,28 +113,27 @@ from uuid import uuid4 from nipype import __version__ as _nipype_ver - from nipype import logging as nlogging from templateflow import __version__ as _tf_ver - from xcp_d import __version__ + from . import __version__ if not hasattr(sys, "_is_pytest_session"): sys._is_pytest_session = False # Trick to avoid sklearn's FutureWarnings - # Disable all warnings in main and children processes only on production versions if not any( ( "+" in __version__, __version__.endswith(".dirty"), - os.getenv("aslprep_DEV", "0").lower() in ("1", "on", "true", "y", "yes"), + os.getenv("FMRIPREP_DEV", "0").lower() in ("1", "on", "true", "y", "yes"), ) ): - from xcp_d._warnings import logging + from ._warnings import logging os.environ["PYTHONWARNINGS"] = "ignore" -elif os.getenv("aslprep_WARNINGS", "0").lower() in ("1", "on", "true", "y", "yes"): +elif os.getenv("FMRIPREP_WARNINGS", "0").lower() in ("1", "on", "true", "y", "yes"): # allow disabling warnings on development versions - from xcp_d._warnings import logging + # https://github.com/nipreps/fmriprep/pull/2080#discussion_r409118765 + from ._warnings import logging else: import logging @@ -115,6 +142,19 @@ DEFAULT_MEMORY_MIN_GB = 0.01 +# Ping NiPype eTelemetry once if env var was not set +# workers on the pool will have the env variable set from the master process +if not _disable_et: + # Just get so analytics track one hit + from contextlib import suppress + + from requests import ConnectionError, ReadTimeout + from requests import get as _get_url + + with suppress((ConnectionError, ReadTimeout)): + _get_url("https://rig.mit.edu/et/projects/nipy/nipype", timeout=0.05) + +# Execution environment _exec_env = os.name _docker_ver = None # special variable set in the container @@ -140,7 +180,7 @@ try: from psutil import virtual_memory - _free_mem_at_start = round(virtual_memory().free / 1024**3, 1) + _free_mem_at_start = round(virtual_memory().available / 1024**3, 1) except Exception: _free_mem_at_start = None @@ -158,11 +198,16 @@ if _proc_oc_kbytes.exists(): _oc_limit = _proc_oc_kbytes.read_text().strip() if _oc_limit in ("0", "n/a") and Path("/proc/sys/vm/overcommit_ratio").exists(): - _oc_limit = f"{Path('/proc/sys/vm/overcommit_ratio').read_text().strip()}%" + _oc_limit = "{}%".format(Path("/proc/sys/vm/overcommit_ratio").read_text().strip()) except Exception: pass +# Debug modes are names that influence the exposure of internal details to +# the user, either through additional derivatives or increased verbosity +DEBUG_MODES = ("compcor", "fieldmaps", "pdb") + + class _Config: """An abstract class forbidding instantiation.""" @@ -173,17 +218,15 @@ def __init__(self): raise RuntimeError("Configuration type is not instantiable.") @classmethod - def load(cls, settings, init=True): + def load(cls, settings, init=True, ignore=None): """Store settings from a dictionary.""" + ignore = ignore or {} for k, v in settings.items(): - if v is None: + if k in ignore or v is None: continue - if k in cls._paths: setattr(cls, k, Path(v).absolute()) - continue - - if hasattr(cls, k): + elif hasattr(cls, k): setattr(cls, k, v) if init: @@ -195,19 +238,21 @@ def load(cls, settings, init=True): @classmethod def get(cls): """Return defined settings.""" + from niworkflows.utils.spaces import Reference, SpatialReferences + out = {} for k, v in cls.__dict__.items(): if k.startswith("_") or v is None: continue - if callable(getattr(cls, k)): continue - if k in cls._paths: v = str(v) - + if isinstance(v, SpatialReferences): + v = " ".join([str(s) for s in v.references]) or None + if isinstance(v, Reference): + v = str(v) or None out[k] = v - return out @@ -216,7 +261,7 @@ class environment(_Config): Read-only options regarding the platform and environment. Crawls runtime descriptive settings (e.g., default FreeSurfer license, - execution environment, nipype and *xcp_d* versions, etc.). + execution environment, nipype and *fMRIPrep* versions, etc.). The ``environment`` section is not loaded in from file, only written out when settings are exported. This config section is useful when reporting issues, @@ -281,7 +326,6 @@ def get_plugin(cls): out["plugin_args"]["n_procs"] = int(cls.nprocs) if cls.memory_gb: out["plugin_args"]["memory_gb"] = float(cls.memory_gb) - return out @classmethod @@ -310,6 +354,7 @@ def init(cls): "crashfile_format": cls.crashfile_format, "get_linked_libs": cls.get_linked_libs, "stop_on_first_crash": cls.stop_on_first_crash, + "check_version": False, # disable future telemetry } } ) @@ -351,6 +396,12 @@ class execution(_Config): """Path to a working directory where intermediate results will be available.""" write_graph = False """Write out the computational graph corresponding to the planned postprocessing.""" + bids_description_hash = None + """Checksum (SHA256) of the ``dataset_description.json`` of the BIDS dataset.""" + reports_only = False + """Only build the reports, based on the reportlets found in a cached working directory.""" + run_uuid = f"{strftime('%Y%m%d-%H%M%S')}_{uuid4()}" + """Unique identifier of this particular run.""" _layout = None @@ -374,23 +425,45 @@ def init(cls): import re from bids.layout import BIDSLayout + from bids.layout.index import BIDSLayoutIndexer - work_dir = cls.work_dir / "bids.db" - work_dir.mkdir(exist_ok=True, parents=True) - cls._layout = BIDSLayout( - str(cls.bids_dir), + _db_path = cls.work_dir / cls.run_uuid / "bids_db" + _db_path.mkdir(exist_ok=True, parents=True) + + # Recommended after PyBIDS 12.1 + _indexer = BIDSLayoutIndexer( validate=False, - # database_path=str(work_dir), ignore=( "code", "stimuli", "sourcedata", "models", - "derivatives", re.compile(r"^\."), + re.compile( + r"sub-[a-zA-Z0-9]+(/ses-[a-zA-Z0-9]+)?/(beh|dwi|eeg|ieeg|meg|perf)" + ), ), ) + cls._layout = BIDSLayout( + str(cls.bids_dir), + database_path=_db_path, + reset_database=True, + indexer=_indexer, + ) + cls.layout = cls._layout + if cls.bids_filters: + from bids.layout import Query + + # unserialize pybids Query enum values + for acq, filters in cls.bids_filters.items(): + cls.bids_filters[acq] = { + k: getattr(Query, v[7:-4]) if not isinstance(v, Query) and "Query" in v else v + for k, v in filters.items() + } + + if "all" in cls.debug: + cls.debug = list(DEBUG_MODES) # These variables are not necessary anymore @@ -406,6 +479,7 @@ def init(cls): class workflow(_Config): """Configure the particular execution graph of this workflow.""" + analysis_level = "participant" # The BIDS App analysis level (only "participant" allowed) input_type = "fmriprep" @@ -474,11 +548,11 @@ class loggers: """The root logger.""" cli = logging.getLogger("cli") """Command-line interface logging.""" - workflow = nlogging.getLogger("nipype.workflow") + workflow = logging.getLogger("nipype.workflow") """NiPype's workflow logger.""" - interface = nlogging.getLogger("nipype.interface") + interface = logging.getLogger("nipype.interface") """NiPype's interface logger.""" - utils = nlogging.getLogger("nipype.utils") + utils = logging.getLogger("nipype.utils") """NiPype's utils logger.""" @classmethod @@ -493,67 +567,111 @@ def init(cls): """ from nipype import config as ncfg - _handler = logging.StreamHandler(stream=sys.stdout) - _handler.setFormatter(logging.Formatter(fmt=cls._fmt, datefmt=cls._datefmt)) - cls.cli.addHandler(_handler) + if not cls.cli.hasHandlers(): + _handler = logging.StreamHandler(stream=sys.stdout) + _handler.setFormatter(logging.Formatter(fmt=cls._fmt, datefmt=cls._datefmt)) + cls.cli.addHandler(_handler) cls.default.setLevel(execution.log_level) cls.cli.setLevel(execution.log_level) cls.interface.setLevel(execution.log_level) cls.workflow.setLevel(execution.log_level) cls.utils.setLevel(execution.log_level) ncfg.update_config( - { - "logging": {"log_directory": str(execution.log_dir), "log_to_file": True}, - } + {"logging": {"log_directory": str(execution.log_dir), "log_to_file": True}} ) class seeds(_Config): - """Initialize the PRNG and track random seed assignments.""" + """Initialize the PRNG and track random seed assignments""" + _random_seed = None master = None - """Master seed used to generate all other tracked seeds""" + """Master random seed to initialize the Pseudorandom Number Generator (PRNG)""" ants = None """Seed used for antsRegistration, antsAI, antsMotionCorr""" + numpy = None + """Seed used by NumPy""" @classmethod def init(cls): - """Initialize a seeds object.""" - cls.master = workflow.random_seed + if cls._random_seed is not None: + cls.master = cls._random_seed if cls.master is None: cls.master = random.randint(1, 65536) random.seed(cls.master) # initialize the PRNG - # functions to set program specific seeds cls.ants = _set_ants_seed() + cls.numpy = _set_numpy_seed() def _set_ants_seed(): - """Fix random seed for antsRegistration, antsAI, antsMotionCorr.""" + """Fix random seed for antsRegistration, antsAI, antsMotionCorr""" val = random.randint(1, 65536) os.environ["ANTS_RANDOM_SEED"] = str(val) return val -def from_dict(settings): - """Read settings from a flat dictionary.""" - nipype.load(settings) - execution.load(settings) - workflow.load(settings) - seeds.init() +def _set_numpy_seed(): + """NumPy's random seed is independent from Python's `random` module""" + import numpy as np + + val = random.randint(1, 65536) + np.random.seed(val) + return val + + +def from_dict(settings, init=True, ignore=None): + """Read settings from a flat dictionary. + + Arguments + --------- + setting : dict + Settings to apply to any configuration + init : `bool` or :py:class:`~collections.abc.Container` + Initialize all, none, or a subset of configurations. + ignore : :py:class:`~collections.abc.Container` + Collection of keys in ``setting`` to ignore + """ + + # Accept global True/False or container of configs to initialize + def initialize(x): + return init if init in (True, False) else x in init + + nipype.load(settings, init=initialize('nipype'), ignore=ignore) + execution.load(settings, init=initialize('execution'), ignore=ignore) + workflow.load(settings, init=initialize('workflow'), ignore=ignore) + seeds.load(settings, init=initialize('seeds'), ignore=ignore) + loggers.init() -def load(filename): - """Load settings from file.""" +def load(filename, skip=None, init=True): + """Load settings from file. + + Arguments + --------- + filename : :py:class:`os.PathLike` + TOML file containing fMRIPrep configuration. + skip : dict or None + Sets of values to ignore during load, keyed by section name + init : `bool` or :py:class:`~collections.abc.Container` + Initialize all, none, or a subset of configurations. + """ from toml import loads + skip = skip or {} + + # Accept global True/False or container of configs to initialize + def initialize(x): + return init if init in (True, False) else x in init + filename = Path(filename) settings = loads(filename.read_text()) for sectionname, configs in settings.items(): if sectionname != "environment": section = getattr(sys.modules[__name__], sectionname) - section.load(configs) + ignore = skip.get(sectionname) + section.load(configs, ignore=ignore, init=initialize(sectionname)) def get(flat=False):