Source code for eddymotion.model.gpr
+# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
+# vi: set ft=python sts=4 ts=4 sw=4 et:
+#
+# Copyright The NiPreps Developers <nipreps@gmail.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# We support and encourage derived works from this project, please read
+# about our expectations at
+#
+# https://www.nipreps.org/community/licensing/
+#
+"""Derivations from scikit-learn for Gaussian Processes."""
+
+from __future__ import annotations
+
+from numbers import Integral, Real
+from typing import Callable, Mapping, Sequence
+
+import numpy as np
+from scipy import optimize
+from scipy.optimize._minimize import Bounds
+from sklearn.gaussian_process import GaussianProcessRegressor
+from sklearn.gaussian_process.kernels import (
+ Hyperparameter,
+ Kernel,
+)
+from sklearn.metrics.pairwise import cosine_similarity
+from sklearn.utils._param_validation import Interval, StrOptions
+
+BOUNDS_A: tuple[float, float] = (0.1, 2.35)
+"""The limits for the parameter *a* (angular distance in rad)."""
+BOUNDS_LAMBDA: tuple[float, float] = (1e-3, 1000)
+"""The limits for the parameter λ (signal scaling factor)."""
+THETA_EPSILON: float = 1e-5
+"""Minimum nonzero angle."""
+LBFGS_CONFIGURABLE_OPTIONS = {"disp", "maxiter", "ftol", "gtol"}
+"""The set of extended options that can be set on the default BFGS."""
+CONFIGURABLE_OPTIONS: Mapping[str, set] = {
+ "Nelder-Mead": {"disp", "maxiter", "adaptive", "fatol"},
+ "CG": {"disp", "maxiter", "gtol"},
+}
+"""
+A mapping from optimizer names to the option set they allow.
+
+Add new optimizers to this list, including what options may be
+configured.
+"""
+NONGRADIENT_METHODS = {"Nelder-Mead"}
+"""A set of gradients that do not allow analytical gradients."""
+SUPPORTED_OPTIMIZERS = set(CONFIGURABLE_OPTIONS.keys()) | {"fmin_l_bfgs_b"}
+"""A set of supported optimizers (automatically created)."""
+
+
+
+[docs]
+class EddyMotionGPR(GaussianProcessRegressor):
+ r"""
+ A GP regressor specialized for eddymotion.
+
+ This specialization of the default GP regressor is created to allow
+ the following extended behaviors:
+
+ * Pacify Scikit-learn's estimator parameter checker to allow optimizers
+ given by name (as a string) other than the default BFGS.
+ * Enable custom options of optimizers.
+ See :obj:`~scipy.optimize.minimize` for the available options.
+ Please note that only a few of them are currently supported.
+
+ In the future, this specialization would be the right place for hyperparameter
+ optimization using cross-validation and such.
+
+ In principle, Scikit-Learn's implementation normalizes the training data
+ as in [Andersson15]_ (see
+ `FSL's souce code <https://git.fmrib.ox.ac.uk/fsl/eddy/-/blob/2480dda293d4cec83014454db3a193b87921f6b0/DiffusionGP.cpp#L218>`__).
+ From their paper (p. 167, end of first column):
+
+ Typically one just substracts the mean (:math:`\bar{\mathbf{f}}`)
+ from :math:`\mathbf{f}` and then add it back to
+ :math:`f^{*}`, which is analogous to what is often done in
+ "traditional" regression.
+
+ Finally, the parameter :math:`\sigma^2` maps on to Scikit-learn's ``alpha``
+ of the regressor.
+ Because it is not a parameter of the kernel, hyperparameter selection
+ through gradient-descent with analytical gradient calculations
+ would not work (the derivative of the kernel w.r.t. alpha is zero).
+
+ I believe this is overlooked in [Andersson15]_, or they actually did not
+ use analytical gradient-descent:
+
+ *A note on optimisation*
+
+ It is suggested, for example in Rasmussen and Williams (2006), that
+ an optimisation method that uses derivative information should be
+ used when finding the hyperparameters that maximise Eq. (12).
+ The reason for that is that such methods typically use fewer steps, and
+ when the cost of calculating the derivatives is small/moderate compared
+ to calculating the functions itself (as is the case for Eq. (12)) then
+ execution time can be much shorter.
+ However, we found that for the multi-shell case a heuristic optimisation
+ method such as the Nelder-Mead simplex method (Nelder and Mead, 1965) was
+ frequently better at avoiding local maxima.
+ Hence, that was the method we used for all optimisations in the present
+ paper.
+
+ **Multi-shell regression (TODO).**
+ For multi-shell modeling, the kernel :math:`k(\textbf{x}, \textbf{x'})`
+ is updated following Eq. (14) in [Andersson15]_.
+
+ .. math::
+ k(\textbf{x}, \textbf{x'}) = C_{\theta}(\mathbf{g}, \mathbf{g'}; a) C_{b}(|b - b'|; \ell)
+
+ and :math:`C_{b}` is based the log of the b-values ratio, a measure of distance along the
+ b-direction, according to Eq. (15) given by:
+
+ .. math::
+ C_{b}(b, b'; \ell) = \exp\left( - \frac{(\log b - \log b')^2}{2 \ell^2} \right),
+
+ :math:`b` and :math:`b'` being the b-values, and :math:`\mathbf{g}` and
+ :math:`\mathbf{g'}` the unit diffusion-encoding gradient unit vectors of the
+ shells; and :math:`{a, \ell}` some hyperparameters.
+
+ The full GP regression kernel :math:`\mathbf{K}` is then updated for a 2-shell case as
+ follows (Eq. (16) in [Andersson15]_):
+
+ .. math::
+ \begin{equation}
+ \mathbf{K} = \left[
+ \begin{matrix}
+ \lambda C_{\theta}(\theta (\mathbf{G}_{1}); a) + \sigma_{1}^{2} \mathbf{I} &
+ \lambda C_{\theta}(\theta (\mathbf{G}_{2}, \mathbf{G}_{1}); a) C_{b}(b_{2}, b_{1}; \ell) \\
+ \lambda C_{\theta}(\theta (\mathbf{G}_{1}, \mathbf{G}_{2}); a) C_{b}(b_{1}, b_{2}; \ell) &
+ \lambda C_{\theta}(\theta (\mathbf{G}_{2}); a) + \sigma_{2}^{2} \mathbf{I} \\
+ \end{matrix}
+ \right]
+ \end{equation}
+
+ References
+ ----------
+ .. [Andersson15] J. L. R. Andersson. et al., An integrated approach to
+ correction for off-resonance effects and subject movement in diffusion MR
+ imaging, NeuroImage 125 (2016) 1063-11078
+
+ """
+
+ _parameter_constraints: dict = {
+ "kernel": [None, Kernel],
+ "alpha": [Interval(Real, 0, None, closed="left"), np.ndarray],
+ "optimizer": [StrOptions(SUPPORTED_OPTIMIZERS), callable, None],
+ "n_restarts_optimizer": [Interval(Integral, 0, None, closed="left")],
+ "copy_X_train": ["boolean"],
+ "normalize_y": ["boolean"],
+ "n_targets": [Interval(Integral, 1, None, closed="left"), None],
+ "random_state": ["random_state"],
+ }
+
+ def __init__(
+ self,
+ kernel: Kernel | None = None,
+ *,
+ alpha: float = 0.5,
+ optimizer: str | Callable | None = "fmin_l_bfgs_b",
+ n_restarts_optimizer: int = 0,
+ copy_X_train: bool = True,
+ normalize_y: bool = True,
+ n_targets: int | None = None,
+ random_state: int | None = None,
+ eval_gradient: bool = True,
+ tol: float | None = None,
+ disp: bool | int | None = None,
+ maxiter: int | None = None,
+ ftol: float | None = None,
+ gtol: float | None = None,
+ adaptive: bool | int | None = None,
+ fatol: float | None = None,
+ ):
+ super().__init__(
+ kernel,
+ alpha=alpha,
+ optimizer=optimizer,
+ n_restarts_optimizer=n_restarts_optimizer,
+ normalize_y=normalize_y,
+ copy_X_train=copy_X_train,
+ n_targets=n_targets,
+ random_state=random_state,
+ )
+
+ self.tol = tol
+ self.eval_gradient = eval_gradient if optimizer not in NONGRADIENT_METHODS else False
+ self.maxiter = maxiter
+ self.disp = disp
+ self.ftol = ftol
+ self.gtol = gtol
+ self.adaptive = adaptive
+ self.fatol = fatol
+
+ def _constrained_optimization(
+ self,
+ obj_func: Callable,
+ initial_theta: np.ndarray,
+ bounds: Sequence[tuple[float, float]] | Bounds,
+ ) -> tuple[float, float]:
+ options = {}
+ if self.optimizer == "fmin_l_bfgs_b":
+ from sklearn.utils.optimize import _check_optimize_result
+
+ for name in LBFGS_CONFIGURABLE_OPTIONS:
+ if (value := getattr(self, name, None)) is not None:
+ options[name] = value
+
+ opt_res = optimize.minimize(
+ obj_func,
+ initial_theta,
+ method="L-BFGS-B",
+ bounds=bounds,
+ jac=self.eval_gradient,
+ options=options,
+ args=(self.eval_gradient,),
+ tol=self.tol,
+ )
+ _check_optimize_result("lbfgs", opt_res)
+ return opt_res.x, opt_res.fun
+
+ if isinstance(self.optimizer, str) and self.optimizer in CONFIGURABLE_OPTIONS:
+ for name in CONFIGURABLE_OPTIONS[self.optimizer]:
+ if (value := getattr(self, name, None)) is not None:
+ options[name] = value
+
+ opt_res = optimize.minimize(
+ obj_func,
+ initial_theta,
+ method=self.optimizer,
+ bounds=bounds,
+ jac=self.eval_gradient,
+ options=options,
+ args=(self.eval_gradient,),
+ tol=self.tol,
+ )
+ return opt_res.x, opt_res.fun
+
+ if callable(self.optimizer):
+ return self.optimizer(obj_func, initial_theta, bounds=bounds)
+
+ raise ValueError(f"Unknown optimizer {self.optimizer}.")
+
+
+
+
+[docs]
+class ExponentialKriging(Kernel):
+ """A scikit-learn's kernel for DWI signals."""
+
+ def __init__(
+ self,
+ beta_a: float = 0.01,
+ beta_l: float = 2.0,
+ a_bounds: tuple[float, float] = BOUNDS_A,
+ l_bounds: tuple[float, float] = BOUNDS_LAMBDA,
+ ):
+ r"""
+ Initialize an exponential Kriging kernel.
+
+ Parameters
+ ----------
+ beta_a : :obj:`float`, optional
+ Minimum angle in rads.
+ beta_l : :obj:`float`, optional
+ The :math:`\lambda` hyperparameter.
+ a_bounds : :obj:`tuple`, optional
+ Bounds for the a parameter.
+ l_bounds : :obj:`tuple`, optional
+ Bounds for the :math:`\lambda` hyperparameter.
+
+ """
+ self.beta_a = beta_a
+ self.beta_l = beta_l
+ self.a_bounds = a_bounds
+ self.l_bounds = l_bounds
+
+ @property
+ def hyperparameter_a(self) -> Hyperparameter:
+ return Hyperparameter("beta_a", "numeric", self.a_bounds)
+
+ @property
+ def hyperparameter_beta_l(self) -> Hyperparameter:
+ return Hyperparameter("beta_l", "numeric", self.l_bounds)
+
+ def __call__(
+ self, X: np.ndarray, Y: np.ndarray | None = None, eval_gradient: bool = False
+ ) -> np.ndarray | tuple[np.ndarray, np.ndarray]:
+ """
+ Return the kernel K(X, Y) and optionally its gradient.
+
+ Parameters
+ ----------
+ X : :obj:`~numpy.ndarray`
+ Gradient table (X)
+ Y : :obj:`~numpy.ndarray`, optional
+ Gradient table (Y, optional)
+ eval_gradient : :obj:`bool`, optional
+ Determines whether the gradient with respect to the log of
+ the kernel hyperparameter is computed.
+ Only supported when Y is ``None``.
+
+ Returns
+ -------
+ K : ndarray of shape (n_samples_X, n_samples_Y)
+ Kernel k(X, Y)
+
+ K_gradient : ndarray of shape (n_samples_X, n_samples_X, n_dims),\
+ optional
+ The gradient of the kernel k(X, X) with respect to the log of the
+ hyperparameter of the kernel. Only returned when `eval_gradient`
+ is True.
+
+ """
+ thetas = compute_pairwise_angles(X, Y)
+ C_theta = exponential_covariance(thetas, self.beta_a)
+
+ if not eval_gradient:
+ return self.beta_l * C_theta
+
+ K_gradient = np.zeros((*thetas.shape, 2))
+ K_gradient[..., 0] = self.beta_l * C_theta * thetas / self.beta_a**2 # Derivative w.r.t. a
+ K_gradient[..., 1] = C_theta
+
+ return self.beta_l * C_theta, K_gradient
+
+
+[docs]
+ def diag(self, X: np.ndarray) -> np.ndarray:
+ """Returns the diagonal of the kernel k(X, X).
+
+ The result of this method is identical to np.diag(self(X)); however,
+ it can be evaluated more efficiently since only the diagonal is
+ evaluated.
+
+ Parameters
+ ----------
+ X : ndarray of shape (n_samples_X, n_features)
+ Left argument of the returned kernel k(X, Y)
+
+ Returns
+ -------
+ K_diag : ndarray of shape (n_samples_X,)
+ Diagonal of kernel k(X, X)
+ """
+ return self.beta_l * np.ones(X.shape[0])
+
+
+
+[docs]
+ def is_stationary(self) -> bool:
+ """Returns whether the kernel is stationary."""
+ return True
+
+
+ def __repr__(self) -> str:
+ return f"ExponentialKriging (a={self.beta_a}, λ={self.beta_l})"
+
+
+
+
+[docs]
+class SphericalKriging(Kernel):
+ """A scikit-learn's kernel for DWI signals."""
+
+ def __init__(
+ self,
+ beta_a: float = 1.38,
+ beta_l: float = 0.5,
+ a_bounds: tuple[float, float] = BOUNDS_A,
+ l_bounds: tuple[float, float] = BOUNDS_LAMBDA,
+ ):
+ r"""
+ Initialize a spherical Kriging kernel.
+
+ Parameters
+ ----------
+ beta_a : :obj:`float`, optional
+ Minimum angle in rads.
+ beta_l : :obj:`float`, optional
+ The :math:`\lambda` hyperparameter.
+ a_bounds : :obj:`tuple`, optional
+ Bounds for the ``a`` parameter.
+ l_bounds : :obj:`tuple`, optional
+ Bounds for the :math:`\lambda` hyperparameter.
+
+ """
+ self.beta_a = beta_a
+ self.beta_l = beta_l
+ self.a_bounds = a_bounds
+ self.l_bounds = l_bounds
+
+ @property
+ def hyperparameter_a(self) -> Hyperparameter:
+ return Hyperparameter("beta_a", "numeric", self.a_bounds)
+
+ @property
+ def hyperparameter_beta_l(self) -> Hyperparameter:
+ return Hyperparameter("beta_l", "numeric", self.l_bounds)
+
+ def __call__(
+ self, X: np.ndarray, Y: np.ndarray | None = None, eval_gradient: bool = False
+ ) -> np.ndarray | tuple[np.ndarray, np.ndarray]:
+ """
+ Return the kernel K(X, Y) and optionally its gradient.
+
+ Parameters
+ ----------
+ X : :obj:`~numpy.ndarray`
+ Gradient table (X)
+ Y : :obj:`~numpy.ndarray`, optional
+ Gradient table (Y, optional)
+ eval_gradient : :obj:`bool`, optional
+ Determines whether the gradient with respect to the log of
+ the kernel hyperparameter is computed.
+ Only supported when Y is ``None``.
+
+ Returns
+ -------
+ K : ndarray of shape (n_samples_X, n_samples_Y)
+ Kernel k(X, Y)
+
+ K_gradient : ndarray of shape (n_samples_X, n_samples_X, n_dims),\
+ optional
+ The gradient of the kernel k(X, X) with respect to the log of the
+ hyperparameter of the kernel. Only returned when ``eval_gradient``
+ is True.
+
+ """
+ thetas = compute_pairwise_angles(X, Y)
+ C_theta = spherical_covariance(thetas, self.beta_a)
+
+ if not eval_gradient:
+ return self.beta_l * C_theta
+
+ deriv_a = np.zeros_like(thetas)
+ nonzero = thetas <= self.beta_a
+ deriv_a[nonzero] = (
+ 1.5
+ * self.beta_l
+ * (thetas[nonzero] / self.beta_a**2 - thetas[nonzero] ** 3 / self.beta_a**4)
+ )
+ K_gradient = np.dstack((deriv_a, C_theta))
+
+ return self.beta_l * C_theta, K_gradient
+
+
+[docs]
+ def diag(self, X: np.ndarray) -> np.ndarray:
+ """Returns the diagonal of the kernel k(X, X).
+
+ The result of this method is identical to np.diag(self(X)); however,
+ it can be evaluated more efficiently since only the diagonal is
+ evaluated.
+
+ Parameters
+ ----------
+ X : ndarray of shape (n_samples_X, n_features)
+ Left argument of the returned kernel k(X, Y)
+
+ Returns
+ -------
+ K_diag : ndarray of shape (n_samples_X,)
+ Diagonal of kernel k(X, X)
+ """
+ return self.beta_l * np.ones(X.shape[0])
+
+
+
+[docs]
+ def is_stationary(self) -> bool:
+ """Returns whether the kernel is stationary."""
+ return True
+
+
+ def __repr__(self) -> str:
+ return f"SphericalKriging (a={self.beta_a}, λ={self.beta_l})"
+
+
+
+
+[docs]
+def exponential_covariance(theta: np.ndarray, a: float) -> np.ndarray:
+ r"""
+ Compute the exponential covariance for given distances and scale parameter.
+
+ Implements :math:`C_{\theta}`, following Eq. (9) in [Andersson15]_:
+
+ .. math::
+ \begin{equation}
+ C(\theta) = e^{-\theta/a} \,\, \text{for} \, 0 \leq \theta \leq \pi,
+ \end{equation}
+
+ :math:`\theta` being computed as:
+
+ .. math::
+ \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|)
+
+ Parameters
+ ----------
+ theta : :obj:`~numpy.ndarray`
+ Array of distances between points.
+ a : :obj:`float`
+ Scale parameter that controls the range of the covariance function.
+
+ Returns
+ -------
+ :obj:`~numpy.ndarray`
+ Exponential covariance values for the input distances.
+
+ """
+ return np.exp(-theta / a)
+
+
+
+
+[docs]
+def spherical_covariance(theta: np.ndarray, a: float) -> np.ndarray:
+ r"""
+ Compute the spherical covariance for given distances and scale parameter.
+
+ Implements :math:`C_{\theta}`, following Eq. (10) in [Andersson15]_:
+
+ .. math::
+ \begin{equation}
+ C(\theta) =
+ \begin{cases}
+ 1 - \frac{3 \theta}{2 a} + \frac{\theta^3}{2 a^3} & \textnormal{if} \; \theta \leq a \\
+ 0 & \textnormal{if} \; \theta > a
+ \end{cases}
+ \end{equation}
+
+ :math:`\theta` being computed as:
+
+ .. math::
+ \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|)
+
+ Parameters
+ ----------
+ theta : :obj:`~numpy.ndarray`
+ Array of distances between points.
+ a : :obj:`float`
+ Scale parameter that controls the range of the covariance function.
+
+ Returns
+ -------
+ :obj:`~numpy.ndarray`
+ Spherical covariance values for the input distances.
+
+ """
+ return np.where(theta <= a, 1 - 1.5 * theta / a + 0.5 * (theta**3) / (a**3), 0.0)
+
+
+
+
+[docs]
+def compute_pairwise_angles(
+ X: np.ndarray,
+ Y: np.ndarray | None = None,
+ closest_polarity: bool = True,
+ dense_output: bool = True,
+) -> np.ndarray:
+ r"""Compute pairwise angles across diffusion gradient encoding directions.
+
+ Following [Andersson15]_, it computes the smallest of the angles between
+ each pair if ``closest_polarity`` is ``True``, i.e.,
+
+ .. math::
+
+ \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|)
+
+ Parameters
+ ----------
+ X : {array-like, sparse matrix} of shape (n_samples_X, n_features)
+ Input data.
+ Y : {array-like, sparse matrix} of shape (n_samples_Y, n_features), optional
+ Input data. If ``None``, the output will be the pairwise
+ similarities between all samples in ``X``.
+ dense_output : :obj:`bool`, optional
+ Whether to return dense output even when the input is sparse. If
+ ``False``, the output is sparse if both input arrays are sparse.
+ closest_polarity : :obj:`bool`, optional
+ ``True`` to consider the smallest of the two angles between the crossing
+ lines resulting from reversing each vector pair.
+
+ Returns
+ -------
+ :obj:`~numpy.ndarray`
+ Pairwise angles across diffusion gradient encoding directions.
+
+ Examples
+ --------
+ >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T
+ >>> compute_pairwise_angles(X, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS
+ 3.1415...
+ >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T
+ >>> Y = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T
+ >>> compute_pairwise_angles(X, Y, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS
+ 3.1415...
+ >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T
+ >>> compute_pairwise_angles(X)[0, 1]
+ 0.0
+
+ """
+
+ cosines = np.clip(cosine_similarity(X, Y, dense_output=dense_output), -1.0, 1.0)
+ thetas = np.arccos(np.abs(cosines)) if closest_polarity else np.arccos(cosines)
+ thetas[np.abs(thetas) < THETA_EPSILON] = 0.0
+ return thetas
+
+