From 308a921cf6bc819f52804b5161d14528412aa978 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Fri, 18 Oct 2024 14:57:06 +0200 Subject: [PATCH] fix: deep revision and refactor of the GP modeling I tried to migrate the cross-validation to only use Scikit-learn CV utilities. At first, Scikit-learn wanted our ``eddymotion.model._dipy.GaussianProcessModel`` to be a sklearn's Estimator. That made me realize that the boundaries between Scikit-learn, DIPY, and eddymotion weren't clear. This commit: * Separates our old ``_dipy`` module into two modules, moving all Scikit-learn-related types into a new ``_sklearn`` module. * Type-annotates and completes docstrings of most of the code. * Updates the test script ``dwi_estimation_error_analysis.py`` to employ the new code. --- scripts/dwi_estimation_error_analysis.py | 36 +- src/eddymotion/model/_dipy.py | 488 +++-------------------- src/eddymotion/model/_sklearn.py | 435 ++++++++++++++++++++ 3 files changed, 502 insertions(+), 457 deletions(-) create mode 100644 src/eddymotion/model/_sklearn.py diff --git a/scripts/dwi_estimation_error_analysis.py b/scripts/dwi_estimation_error_analysis.py index 062189c4..5ce5d545 100644 --- a/scripts/dwi_estimation_error_analysis.py +++ b/scripts/dwi_estimation_error_analysis.py @@ -39,13 +39,14 @@ from dipy.core.sphere import HemiSphere, Sphere, disperse_charges from dipy.sims.voxel import all_tensor_evecs, single_tensor from matplotlib import pyplot as plt -from nireports.reportlets.modality.dwi import nii_to_carpetplot_data -from nireports.reportlets.nuisance import plot_carpet from scipy.stats import pearsonr from sklearn.metrics import root_mean_squared_error -from sklearn.model_selection import KFold, cross_val_score +from sklearn.model_selection import KFold, RepeatedKFold, cross_val_score -from eddymotion.model._dipy import GaussianProcessModel +from eddymotion.model._sklearn import ( + EddyMotionGPR, + SphericalKriging, +) def add_b0(bvals: np.ndarray, bvecs: np.ndarray) -> tuple[np.ndarray, np.ndarray]: @@ -300,7 +301,6 @@ def perform_experiment( rng = np.random.default_rng(1234) # Define the Gaussian process model parameter - kernel_model = "spherical" lambda_s = 2.0 a = 1.0 sigma_sq = 0.5 @@ -326,9 +326,12 @@ def perform_experiment( kf = KFold(n_splits=n, shuffle=False) # Define the Gaussian process model instance - gp_model = GaussianProcessModel( - kernel_model=kernel_model, lambda_s=lambda_s, a=a, sigma_sq=sigma_sq + gp_model = EddyMotionGPR( + kernel=SphericalKriging(a=a, lambda_s=lambda_s), + alpha=sigma_sq, + optimizer=None, ) + _data = [] for _, (train_index, test_index) in enumerate(kf.split(nzero_bvecs)): # Create the training mask leaving out the requested number of samples @@ -382,20 +385,18 @@ def cross_validate( """ - gp_params = { - "weighting": "exponential", - "lambda_s": 2.0, - "a": 1.0, - "sigma_sq": 2.0, - } - signal = single_tensor(gtab, S0=S0, evals=evals1, evecs=evecs, snr=snr) - gpm = GaussianProcessModel(**gp_params) + gpm = EddyMotionGPR( + kernel=SphericalKriging(a=2.15, lambda_s=120), + alpha=50, + optimizer=None, + ) X = gtab[~gtab.b0s_mask].bvecs y = signal[~gtab.b0s_mask] - scores = cross_val_score(gpm, X, y, scoring="neg_root_mean_squared_error", cv=cv) + rkf = RepeatedKFold(n_splits=cv, n_repeats=120 // cv) + scores = cross_val_score(gpm, X, y, scoring="neg_root_mean_squared_error", cv=rkf) return scores @@ -486,6 +487,9 @@ def plot_error( def plot_estimation_carpet(gt_nii, gp_nii, gtab, suptitle, **kwargs): + from nireports.reportlets.modality.dwi import nii_to_carpetplot_data + from nireports.reportlets.nuisance import plot_carpet + fig = plt.figure(layout="tight") gs = gridspec.GridSpec(ncols=1, nrows=2, figure=fig) fig.suptitle(suptitle) diff --git a/src/eddymotion/model/_dipy.py b/src/eddymotion/model/_dipy.py index 5e0127be..b4e1971d 100644 --- a/src/eddymotion/model/_dipy.py +++ b/src/eddymotion/model/_dipy.py @@ -1,7 +1,7 @@ # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- # vi: set ft=python sts=4 ts=4 sw=4 et: # -# Copyright The NiPreps Developers +# © The NiPreps Developers # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,90 +20,26 @@ # # https://www.nipreps.org/community/licensing/ # -r""" -DIPY-like models (a sandbox to trial them out before upstreaming to DIPY). - -Gaussian Process Model: Pairwise orientation angles ---------------------------------------------------- -Squared Exponential covariance kernel -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Kernel based on a squared exponential function for Gaussian processes on -multi-shell DWI data following to eqs. 14 and 16 in [Andersson15]_. -For a 2-shell case, the :math:`\mathbf{K}` kernel can be written as: - -.. math:: - \begin{equation} - \mathbf{K} = \left[ - \begin{matrix} - \lambda C_{\theta}(\theta (\mathbf{G}_{1}); a) + \sigma_{1}^{2} \mathbf{I} & - \lambda C_{\theta}(\theta (\mathbf{G}_{2}, \mathbf{G}_{1}); a) C_{b}(b_{2}, b_{1}; \ell) \\ - \lambda C_{\theta}(\theta (\mathbf{G}_{1}, \mathbf{G}_{2}); a) C_{b}(b_{1}, b_{2}; \ell) & - \lambda C_{\theta}(\theta (\mathbf{G}_{2}); a) + \sigma_{2}^{2} \mathbf{I} \\ - \end{matrix} - \right] - \end{equation} - -**Squared exponential shell-wise covariance kernel**: -Compute the squared exponential smooth function describing how the -covariance changes along the b direction. -It uses the log of the b-values as the measure of distance along the -b-direction according to eq. 15 in [Andersson15]_. - -.. math:: - C_{b}(b, b'; \ell) = \exp\left( - \frac{(\log b - \log b')^2}{2 \ell^2} \right). - -**Squared exponential covariance kernel**: -Compute the squared exponential covariance matrix following to eq. 14 in [Andersson15]_. - -.. math:: - k(\textbf{x}, \textbf{x'}) = C_{\theta}(\mathbf{g}, \mathbf{g'}; a) C_{b}(|b - b'|; \ell) - -where :math:`C_{\theta}` is given by: - -.. math:: - \begin{equation} - C(\theta) = - \begin{cases} - 1 - \frac{3 \theta}{2 a} + \frac{\theta^3}{2 a^3} & \textnormal{if} \; \theta \leq a \\ - 0 & \textnormal{if} \; \theta > a - \end{cases} - \end{equation} - -:math:`\theta` being computed as: - -.. math:: - \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|) - -and :math:`C_{b}` is given by: - -.. math:: - C_{b}(b, b'; \ell) = \exp\left( - \frac{(\log b - \log b')^2}{2 \ell^2} \right) - -:math:`b` and :math:`b'` being the b-values, and :math:`\mathbf{g}` and -:math:`\mathbf{g'}` the unit diffusion-encoding gradient unit vectors of the -shells; and :math:`{a, \ell}` some hyperparameters. - -""" +"""DIPY-like models (a sandbox to trial them out before upstreaming to DIPY).""" from __future__ import annotations import warnings -from sys import modules import numpy as np -from dipy.core.gradients import GradientTable, gradient_table +from dipy.core.gradients import GradientTable from dipy.reconst.base import ReconstModel from sklearn.gaussian_process import GaussianProcessRegressor -from sklearn.gaussian_process.kernels import ( - DotProduct, - Hyperparameter, - Kernel, - WhiteKernel, + +from eddymotion.model._sklearn import ( + EddyMotionGPR, + ExponentialKriging, + SphericalKriging, ) def gp_prediction( - X: np.ndarray, + gtab: GradientTable | np.ndarray, model: GaussianProcessRegressor, mask: np.ndarray | None = None, return_std: bool = False, @@ -112,14 +48,14 @@ def gp_prediction( Predicts one or more DWI orientations given a model. This function checks if the model is fitted and then extracts - orientations and potentially b-values from the gtab. It predicts the mean + orientations and potentially b-values from the X. It predicts the mean and standard deviation of the DWI signal using the model. Parameters ---------- model: :obj:`~sklearn.gaussian_process.GaussianProcessRegressor` A fitted GaussianProcessRegressor model. - gtab: :obj:`~dipy.core.gradients.GradientTable` + gtab : :obj:`~dipy.core.gradients.GradientTable` or :obj:`~np.ndarray` Gradient table with one or more orientations at which the GP will be evaluated. mask: :obj:`numpy.ndarray` A boolean mask indicating which voxels to use (optional). @@ -131,13 +67,15 @@ def gp_prediction( """ + X = gtab.bvecs.T if hasattr("bvecs") else np.asarray(gtab) + # Check it's fitted as they do in sklearn internally # https://github.com/scikit-learn/scikit-learn/blob/972e17fe1aa12d481b120ad4a3dc076bae736931/\ # sklearn/gaussian_process/_gpr.py#L410C9-L410C42 if not hasattr(model, "X_train_"): raise RuntimeError("Model is not yet fitted.") - # Extract orientations from gtab, and highly likely, the b-value too. + # Extract orientations from bvecs, and highly likely, the b-value too. return model.predict(X, return_std=return_std) @@ -155,8 +93,6 @@ def __init__( lambda_s: float = 2.0, a: float = 0.1, sigma_sq: float = 1.0, - lambda_s_bounds: tuple[float, float] = (1e-3, 1e3), - a_bounds: tuple[float, float] = (1e-3, np.pi), *args, **kwargs, ) -> None: @@ -164,7 +100,7 @@ def __init__( Parameters ---------- - kernel_model : :obj:`str` + kernel : :obj:`~sklearn.gaussian_process.kernels.Kernel` Kernel model to calculate the GP's covariance matrix. References @@ -178,23 +114,19 @@ def __init__( """ ReconstModel.__init__(self, None) - self.kernel = ( - PairwiseOrientationKernel( - weighting=kernel_model, - a=a, - lambda_s=lambda_s, - sigma_sq=sigma_sq, - lambda_s_bounds=lambda_s_bounds, - a_bounds=a_bounds, - ) - if kernel_model != "test" - else DotProduct() + WhiteKernel() + + self.sigma_sq = sigma_sq + + KernelType = SphericalKriging if kernel_model == "spherical" else ExponentialKriging + self.kernel = KernelType( + a=a, + lambda_s=lambda_s, ) def fit( self, - X: np.ndarray, - y: np.ndarray | None = None, + data: np.ndarray, + gtab: GradientTable | np.ndarray, mask: np.ndarray[bool] | None = None, random_state: int = 0, ) -> GPFit: @@ -202,8 +134,8 @@ def fit( Parameters ---------- - X : :obj:`~numpy.ndarray` - The gradient table corresponding to the training data (n_samples, n_features). + gtab : :obj:`~dipy.core.gradients.GradientTable` or :obj:`~np.ndarray` + The gradient table corresponding to the training data. y : :obj:`~numpy.ndarray` The measured signal from one voxel. mask : :obj:`~numpy.ndarray` @@ -217,18 +149,27 @@ def fit( """ - y = (y[mask[..., None]] if mask is not None else np.reshape(y, (-1, y.shape[-1]))).T + # Extract b-vecs: Scikit learn wants (n_samples, n_features) + # where n_features is 3, and n_samples the different diffusion orientations. + X = gtab.bvecs.T if hasattr("bvecs") else np.asarray(gtab) + + # Data must be shapes (n_samples, n_targets) where n_samples is + # the number of diffusion orientations, and n_targets is number of voxels. + y = ( + data[mask[..., None]] if mask is not None else np.reshape(data, (-1, data.shape[-1])) + ).T - # sklearn wants (n_samples, n_targets) for Y, where n_targets = n_voxels to simulate. if (grad_dirs := X.shape[0]) != (signal_dirs := y.shape[0]): raise ValueError( - f"Mismatched data {signal_dirs} and gradient table {grad_dirs} sizes." + f"Mismatched gradient directions in data ({signal_dirs}) " + f"and gradient table ({grad_dirs})." ) - gpr = GaussianProcessRegressor( + gpr = EddyMotionGPR( kernel=self.kernel, random_state=random_state, n_targets=y.shape[1], + alpha=self.sigma_sq, ) self._modelfit = GPFit( model=gpr.fit(X, y), @@ -236,13 +177,9 @@ def fit( ) return self._modelfit - # @multi_voxel_fit - # def multi_fit(self, data_thres, mask=None, **kwargs): - # return GPFit(gpr.fit(self.gtab, data_thres)) - def predict( self, - gtab: GradientTable, + gtab: GradientTable | np.ndarray, **kwargs, ) -> np.ndarray: """ @@ -250,8 +187,8 @@ def predict( Parameters ---------- - gtab : :obj:`~dipy.core.gradients.GradientTable` - One or more gradient orientations at which the GP will be evaluated. + gtab : :obj:`~dipy.core.gradients.GradientTable` or :obj:`~np.ndarray` + Gradient table with one or more orientations at which the GP will be evaluated. Returns ------- @@ -261,40 +198,6 @@ def predict( """ return self._modelfit.predict(gtab) - def get_params(self, deep=True): - """ - Get parameters of the kernel. - - Parameters - ---------- - deep : :obj:`bool` - Whether to return the parameters of the contained subobjects. - - Returns - ------- - params : :obj:`dict` - Parameter names mapped to their values. - - """ - return self.kernel.get_params(deep=deep) - - def set_params(self, **params): - """ - Set parameters of the kernel. - - Parameters - ---------- - params : :obj:`dict` - Kernel parameters. - - Returns - ------- - self : :obj:`object` - Returns self. - - """ - return self.kernel.set_params(**params) - class GPFit: """ @@ -305,8 +208,6 @@ class GPFit: Attributes ---------- - fitted_gtab : :obj:`~dipy.core.gradients.GradientTable` - The original gradient table with which the GP has been fitted. model: :obj:`~sklearn.gaussian_process.GaussianProcessRegressor` The fitted Gaussian process regressor object. mask: :obj:`~numpy.ndarray` @@ -335,14 +236,14 @@ def __init__( def predict( self, - X: np.ndarray, + gtab: GradientTable | np.ndarray, ) -> np.ndarray: """ Generate DWI signal based on a fitted Gaussian Process. Parameters ---------- - gtab: :obj:`~dipy.core.gradients.GradientTable` + gtab : :obj:`~dipy.core.gradients.GradientTable` or :obj:`~np.ndarray` Gradient table with one or more orientations at which the GP will be evaluated. Returns @@ -351,304 +252,7 @@ def predict( A 3D or 4D array with the simulated gradient(s). """ - return gp_prediction(X, self.model, mask=self.mask) - - -def _ensure_positive_scale( - a: float, -) -> None: - if a <= 0: - raise ValueError(f"a must be strictly positive. Provided: {a}") - - -def compute_exponential_covariance( - theta: np.ndarray, - a: float, -) -> np.ndarray: - r"""Compute the exponential covariance matrix following eq. 9 in [Andersson15]_. - - .. math:: - - C(\theta) = \exp(- \frac{\theta}{a}) - - Parameters - ---------- - theta : :obj:`~numpy.ndarray` - Pairwise angles across diffusion gradient encoding directions. - a : :obj:`float` - Positive scale parameter that here determines the "distance" at which θ - the covariance goes to zero. - - Returns - ------- - :obj:`~numpy.ndarray` - Exponential covariance function values. - - """ - - _ensure_positive_scale(a) - - return np.exp(-theta / a) - - -def compute_spherical_covariance( - theta: np.ndarray, - a: float, -) -> np.ndarray: - r"""Compute the spherical covariance matrix following eq. 10 in [Andersson15]_. - - .. math:: - - C(\theta) = \begin{cases} - 1 - \frac{3 \theta}{2 a} + \frac{\theta^3}{2 a^3} & \textnormal{if} \; \theta \leq a \\ - 0 & \textnormal{if} \; \theta > a - \end{cases} - - Parameters - ---------- - theta : :obj:`~numpy.ndarray` - Pairwise angles across diffusion gradient encoding directions. - a : :obj:`float` - Positive scale parameter that here determines the "distance" at which θ - the covariance goes to zero. - - Returns - ------- - :obj:`~numpy.ndarray` - Spherical covariance matrix. - - """ - - _ensure_positive_scale(a) - - return np.where(theta <= a, 1 - 3 * theta / (2 * a) + theta**3 / (2 * a**3), 0) - - -def compute_pairwise_angles( - gtab_X: GradientTable | np.ndarray, - gtab_Y: GradientTable | np.ndarray | None = None, - closest_polarity: bool = True, -) -> np.ndarray: - r"""Compute pairwise angles across diffusion gradient encoding directions. - - Following [Andersson15]_, it computes the smallest of the angles between - each pair if ``closest_polarity`` is ``True``, i.e. - - .. math:: - - \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|) - - Parameters - ---------- - gtab_X: :obj:`~dipy.core.gradients.GradientTable` - Gradient table - gtab_Y: :obj:`~dipy.core.gradients.GradientTable` - Gradient table - closest_polarity : :obj:`bool` - ``True`` to consider the smallest of the two angles between the crossing - lines resulting from reversing each vector pair. - - Returns - ------- - :obj:`~numpy.ndarray` - Pairwise angles across diffusion gradient encoding directions. - - Examples - -------- - >>> from dipy.core.gradients import gradient_table - >>> bvecs = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]) - >>> gtab = gradient_table([1000] * bvecs.shape[-1], bvecs) - >>> compute_pairwise_angles(gtab, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS - 3.1415... - >>> bvecs1 = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]) - >>> bvecs2 = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]) - >>> gtab1 = gradient_table([1000] * bvecs1.shape[-1], bvecs1) - >>> gtab2 = gradient_table([1000] * bvecs2.shape[-1], bvecs2) - >>> compute_pairwise_angles(gtab1, gtab2, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS - 3.1415... - >>> bvecs = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]) - >>> gtab = gradient_table([1000] * bvecs.shape[-1], bvecs) - >>> compute_pairwise_angles(gtab)[0, 1] - 0.0 - - References - ---------- - .. [Andersson15] J. L. R. Andersson. et al., An integrated approach to - correction for off-resonance effects and subject movement in diffusion MR - imaging, NeuroImage 125 (2016) 1063–1078 - - """ - - bvecs_X = getattr(gtab_X, "bvecs", gtab_X) - bvecs_X = np.array(bvecs_X.T) / np.linalg.norm(bvecs_X, axis=1) - - if gtab_Y is None: - bvecs_Y = bvecs_X - else: - bvecs_Y = np.array(getattr(gtab_Y, "bvecs", gtab_Y)) - if bvecs_Y.ndim == 1: - bvecs_Y = bvecs_Y[np.newaxis, ...] - bvecs_Y = bvecs_Y.T / np.linalg.norm(bvecs_Y, axis=1) - - cosines = np.clip(bvecs_X.T @ bvecs_Y, -1.0, 1.0) - return np.arccos(np.abs(cosines) if closest_polarity else cosines) - - -class PairwiseOrientationKernel(Kernel): - """A scikit-learn's kernel for DWI signals.""" - - def __init__( - self, - weighting: str = "exponential", - lambda_s: float = 2.0, - a: float = 0.1, - sigma_sq: float = 1.0, - lambda_s_bounds: tuple[float, float] = (1e-3, 1e3), - a_bounds: tuple[float, float] = (1e-5, 0.5 * np.pi), - ): - r""" - Initialize a kernel with pairwise angles. - - Parameters - ---------- - weighting : :obj:`str` - The type of kernel to build (either "exponential", "sphere", or "test"). - lambda_s : :obj:`float` - The :math:`\lambda_s` hyperparameter. - a : :obj:`float` - Minimum angle in rads. - sigma_sq : :obj:`float` - Error allowed in collinear orientations. - lambda_s_bounds : :obj:`tuple` - Bounds for the :math:`\lambda_s` hyperparameter. - a_bounds : :obj:`tuple` - Bounds for the a parameter. - - """ - self.weighting = weighting # For __repr__ - self.lambda_s = lambda_s - self.a = a - self.sigma_sq = sigma_sq - self.lambda_s_bounds = lambda_s_bounds - self.a_bounds = a_bounds - - @property - def hyperparameter_lambda_s(self): - return Hyperparameter("lambda_s", "numeric", self.lambda_s_bounds) - - @property - def hyperparameter_a(self): - return Hyperparameter("a", "numeric", self.a_bounds) - - @property - def hyperparameter_sigma_sq(self): - return Hyperparameter("sigma_sq", "numeric", "fixed") - - def __call__(self, gtab_X, gtab_Y=None, eval_gradient=False): - """ - Return the kernel K(gtab_X, gtab_Y) and optionally its gradient. - - Parameters - ---------- - gtab_X: :obj:`~dipy.core.gradients.GradientTable` - Gradient table (X) - gtab_Y: :obj:`~dipy.core.gradients.GradientTable`, optional - Gradient table (Y, optional) - eval_gradient : :obj:`bool`, optional - Determines whether the gradient with respect to the log of - the kernel hyperparameter is computed. - Only supported when gtab_Y is ``None``. - - Returns - ------- - K : ndarray of shape (n_samples_X, n_samples_Y) - Kernel k(X, Y) - - K_gradient : ndarray of shape (n_samples_X, n_samples_X, n_dims),\ - optional - The gradient of the kernel k(X, X) with respect to the log of the - hyperparameter of the kernel. Only returned when `eval_gradient` - is True. - - """ - thetas = compute_pairwise_angles(gtab_X, gtab_Y) - collinear = np.abs(thetas) < 1e-5 - thetas[collinear] = 0.0 - - covfunc = getattr(modules[__name__], f"compute_{self.weighting}_covariance") - - K = self.lambda_s * covfunc(thetas, self.a) - K[collinear] += self.sigma_sq - - if not eval_gradient: - return K - - if gtab_Y is not None: - raise RuntimeError("Gradients should not be calculated in inference time") - - params = {p.name: self.get_params()[p.name] for p in self.hyperparameters if not p.fixed} - n_partials = len(params) - - a = params.pop("a") - lambda_s = params.pop("lambda_s") - - min_angles = thetas > a - - if self.weighting == "spherical": - deriv_a = 1.5 * (thetas[min_angles] / a**2 - thetas[min_angles] ** 3 / a**4) - elif self.weighting == "exponential": - deriv_a = np.exp(-thetas[min_angles] / a) * thetas[min_angles] / a**2 - else: - raise ValueError(f"Unknown kernel weighting '{self.weighting}'.") - - K_gradient = np.zeros((*thetas.shape, n_partials)) - - # Parameters are ordered alphabetically - K_gradient[..., 0][min_angles] = lambda_s * deriv_a # Derivative w.r.t. a - K_gradient[..., 1] = K / lambda_s # Derivative w.r.t. lambda_s - - # Gradient scaling - # K_gradient[np.abs(K_gradient) < 1e-5] = 0 - # maxg = np.max((np.abs(K_gradient).max(axis=(0, 1)), (1, 1)), axis=-1) - # K_gradient /= maxg - # print(K_gradient.mean(axis=(0, 1))) - - return K, K_gradient - - def diag(self, X): - """Returns the diagonal of the kernel k(X, X). - - The result of this method is identical to np.diag(self(X)); however, - it can be evaluated more efficiently since only the diagonal is - evaluated. - - Parameters - ---------- - X : ndarray of shape (n_samples_X, n_features) - Left argument of the returned kernel k(X, Y) - - Returns - ------- - K_diag : ndarray of shape (n_samples_X,) - Diagonal of kernel k(X, X) - """ - try: - n = len(X) - except TypeError: - n = len(X.bvals) - - covfunc = getattr(modules[__name__], f"compute_{self.weighting}_covariance") - return self.lambda_s * covfunc(np.zeros(n), self.a) + self.sigma_sq - - def is_stationary(self): - """Returns whether the kernel is stationary.""" - return True - - def __repr__(self): - return ( - f"{self.weighting} kernel" - f"(a={self.a}, lambda_s={self.lambda_s}, sigma_sq={self.sigma_sq})" - ) + return gp_prediction(gtab, self.model, mask=self.mask) def _rasb2dipy(gradient): @@ -664,6 +268,8 @@ def _rasb2dipy(gradient): print("Warning: make sure gradient information is not transposed!") with warnings.catch_warnings(): + from dipy.core.gradients import gradient_table + warnings.filterwarnings("ignore", category=UserWarning) retval = gradient_table(gradient[3, :], gradient[:3, :].T) return retval diff --git a/src/eddymotion/model/_sklearn.py b/src/eddymotion/model/_sklearn.py new file mode 100644 index 00000000..8e69cc6d --- /dev/null +++ b/src/eddymotion/model/_sklearn.py @@ -0,0 +1,435 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# © The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +r""" +Derivations from scikit-learn for Gaussian Processes. + +Gaussian Process Model: Pairwise orientation angles +--------------------------------------------------- +Squared Exponential covariance kernel +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Kernel based on a squared exponential function for Gaussian processes on +multi-shell DWI data following to eqs. 14 and 16 in [Andersson15]_. +For a 2-shell case, the :math:`\mathbf{K}` kernel can be written as: + +.. math:: + \begin{equation} + \mathbf{K} = \left[ + \begin{matrix} + \lambda C_{\theta}(\theta (\mathbf{G}_{1}); a) + \sigma_{1}^{2} \mathbf{I} & + \lambda C_{\theta}(\theta (\mathbf{G}_{2}, \mathbf{G}_{1}); a) C_{b}(b_{2}, b_{1}; \ell) \\ + \lambda C_{\theta}(\theta (\mathbf{G}_{1}, \mathbf{G}_{2}); a) C_{b}(b_{1}, b_{2}; \ell) & + \lambda C_{\theta}(\theta (\mathbf{G}_{2}); a) + \sigma_{2}^{2} \mathbf{I} \\ + \end{matrix} + \right] + \end{equation} + +**Squared exponential shell-wise covariance kernel**: +Compute the squared exponential smooth function describing how the +covariance changes along the b direction. +It uses the log of the b-values as the measure of distance along the +b-direction according to eq. 15 in [Andersson15]_. + +.. math:: + C_{b}(b, b'; \ell) = \exp\left( - \frac{(\log b - \log b')^2}{2 \ell^2} \right). + +**Squared exponential covariance kernel**: +Compute the squared exponential covariance matrix following to eq. 14 in [Andersson15]_. + +.. math:: + k(\textbf{x}, \textbf{x'}) = C_{\theta}(\mathbf{g}, \mathbf{g'}; a) C_{b}(|b - b'|; \ell) + +where :math:`C_{\theta}` is given by: + +.. math:: + \begin{equation} + C(\theta) = + \begin{cases} + 1 - \frac{3 \theta}{2 a} + \frac{\theta^3}{2 a^3} & \textnormal{if} \; \theta \leq a \\ + 0 & \textnormal{if} \; \theta > a + \end{cases} + \end{equation} + +:math:`\theta` being computed as: + +.. math:: + \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|) + +and :math:`C_{b}` is given by: + +.. math:: + C_{b}(b, b'; \ell) = \exp\left( - \frac{(\log b - \log b')^2}{2 \ell^2} \right) + +:math:`b` and :math:`b'` being the b-values, and :math:`\mathbf{g}` and +:math:`\mathbf{g'}` the unit diffusion-encoding gradient unit vectors of the +shells; and :math:`{a, \ell}` some hyperparameters. + +""" + +from __future__ import annotations + +from typing import Callable, Sequence + +import numpy as np +from scipy import optimize +from scipy.optimize._minimize import Bounds +from sklearn.gaussian_process import GaussianProcessRegressor +from sklearn.gaussian_process.kernels import ( + Hyperparameter, + Kernel, +) +from sklearn.metrics.pairwise import cosine_similarity + +BOUNDS_A: tuple[float, float] = (1e-4, np.pi) +BOUNDS_LAMBDA_S: tuple[float, float] = (1e-4, 1e4) +THETA_EPSILON: float = 1e-5 + + +class EddyMotionGPR(GaussianProcessRegressor): + def __init__( + self, + kernel: Kernel | None = None, + *, + alpha: float = 1e-10, + optimizer: str | Callable | None = "fmin_l_bfgs_b", + n_restarts_optimizer: int = 0, + normalize_y: bool = True, + copy_X_train: bool = True, + n_targets: int | None = None, + random_state: int | None = None, + max_iter: int = 2e05, + gtol: float = 1e-06, + ): + self.max_iter = max_iter + self.gtol = gtol + + super().__init__( + kernel, + alpha=alpha, + optimizer=optimizer, + n_restarts_optimizer=n_restarts_optimizer, + normalize_y=normalize_y, + copy_X_train=copy_X_train, + n_targets=n_targets, + random_state=random_state, + ) + + def _constrained_optimization( + self, + obj_func: Callable, + initial_theta: np.ndarray, + bounds: Sequence[tuple[float, float]] | Bounds, + ) -> tuple[float, float]: + from sklearn.utils.optimize import _check_optimize_result + + opt_res = optimize.minimize( + obj_func, + initial_theta, + method="L-BFGS-B", + jac=True, + bounds=bounds, + options={"maxiter": self.max_iter, "gtol": self.gtol}, + ) + _check_optimize_result("lbfgs", opt_res) + return opt_res.x, opt_res.fun + + +class ExponentialKriging(Kernel): + """A scikit-learn's kernel for DWI signals.""" + + def __init__( + self, + a: float = 0.01, + lambda_s: float = 2.0, + a_bounds: tuple[float, float] = BOUNDS_A, + lambda_s_bounds: tuple[float, float] = BOUNDS_LAMBDA_S, + ): + r""" + Initialize an exponential Kriging kernel. + + Parameters + ---------- + a : :obj:`float` + Minimum angle in rads. + lambda_s : :obj:`float` + The :math:`\lambda_s` hyperparameter. + a_bounds : :obj:`tuple` + Bounds for the a parameter. + lambda_s_bounds : :obj:`tuple` + Bounds for the :math:`\lambda_s` hyperparameter. + + """ + self.a = a + self.lambda_s = lambda_s + self.a_bounds = a_bounds + self.lambda_s_bounds = lambda_s_bounds + + @property + def hyperparameter_a(self) -> Hyperparameter: + return Hyperparameter("a", "numeric", self.a_bounds) + + @property + def hyperparameter_lambda_s(self) -> Hyperparameter: + return Hyperparameter("lambda_s", "numeric", self.lambda_s_bounds) + + def __call__( + self, X: np.ndarray, Y: np.ndarray | None = None, eval_gradient: bool = False + ) -> np.ndarray | tuple[np.ndarray, np.ndarray]: + """ + Return the kernel K(X, Y) and optionally its gradient. + + Parameters + ---------- + X: :obj:`~numpy.ndarray` + Gradient table (X) + Y: :obj:`~numpy.ndarray`, optional + Gradient table (Y, optional) + eval_gradient : :obj:`bool`, optional + Determines whether the gradient with respect to the log of + the kernel hyperparameter is computed. + Only supported when Y is ``None``. + + Returns + ------- + K : ndarray of shape (n_samples_X, n_samples_Y) + Kernel k(X, Y) + + K_gradient : ndarray of shape (n_samples_X, n_samples_X, n_dims),\ + optional + The gradient of the kernel k(X, X) with respect to the log of the + hyperparameter of the kernel. Only returned when `eval_gradient` + is True. + + """ + thetas = compute_pairwise_angles(X, Y) + thetas[np.abs(thetas) < THETA_EPSILON] = 0.0 + C_theta = np.exp(-thetas / self.a) + + if not eval_gradient: + return self.lambda_s * C_theta + + K_gradient = np.zeros((*thetas.shape, 2)) + K_gradient[..., 0] = self.lambda_s * C_theta * thetas / self.a**2 # Derivative w.r.t. a + K_gradient[..., 1] = C_theta + + return self.lambda_s * C_theta, K_gradient + + def diag(self, X: np.ndarray) -> np.ndarray: + """Returns the diagonal of the kernel k(X, X). + + The result of this method is identical to np.diag(self(X)); however, + it can be evaluated more efficiently since only the diagonal is + evaluated. + + Parameters + ---------- + X : ndarray of shape (n_samples_X, n_features) + Left argument of the returned kernel k(X, Y) + + Returns + ------- + K_diag : ndarray of shape (n_samples_X,) + Diagonal of kernel k(X, X) + """ + return self.lambda_s * np.ones(X.shape[0]) + + def is_stationary(self) -> bool: + """Returns whether the kernel is stationary.""" + return True + + def __repr__(self) -> str: + return f"ExponentialKriging (a={self.a}, λₛ={self.lambda_s})" + + +class SphericalKriging(Kernel): + """A scikit-learn's kernel for DWI signals.""" + + def __init__( + self, + a: float = 0.01, + lambda_s: float = 2.0, + a_bounds: tuple[float, float] = BOUNDS_A, + lambda_s_bounds: tuple[float, float] = BOUNDS_LAMBDA_S, + ): + r""" + Initialize a spherical Kriging kernel. + + Parameters + ---------- + a : :obj:`float` + Minimum angle in rads. + lambda_s : :obj:`float` + The :math:`\lambda_s` hyperparameter. + a_bounds : :obj:`tuple` + Bounds for the a parameter. + lambda_s_bounds : :obj:`tuple` + Bounds for the :math:`\lambda_s` hyperparameter. + + """ + self.a = a + self.lambda_s = lambda_s + self.a_bounds = a_bounds + self.lambda_s_bounds = lambda_s_bounds + + @property + def hyperparameter_a(self) -> Hyperparameter: + return Hyperparameter("a", "numeric", self.a_bounds) + + @property + def hyperparameter_lambda_s(self) -> Hyperparameter: + return Hyperparameter("lambda_s", "numeric", self.lambda_s_bounds) + + def __call__( + self, X: np.ndarray, Y: np.ndarray | None = None, eval_gradient: bool = False + ) -> np.ndarray | tuple[np.ndarray, np.ndarray]: + """ + Return the kernel K(X, Y) and optionally its gradient. + + Parameters + ---------- + X: :obj:`~numpy.ndarray` + Gradient table (X) + Y: :obj:`~numpy.ndarray`, optional + Gradient table (Y, optional) + eval_gradient : :obj:`bool`, optional + Determines whether the gradient with respect to the log of + the kernel hyperparameter is computed. + Only supported when Y is ``None``. + + Returns + ------- + K : ndarray of shape (n_samples_X, n_samples_Y) + Kernel k(X, Y) + + K_gradient : ndarray of shape (n_samples_X, n_samples_X, n_dims),\ + optional + The gradient of the kernel k(X, X) with respect to the log of the + hyperparameter of the kernel. Only returned when `eval_gradient` + is True. + + """ + thetas = compute_pairwise_angles(X, Y) + thetas[np.abs(thetas) < THETA_EPSILON] = 0.0 + + nonzero = thetas <= self.a + + C_theta = np.zeros_like(thetas) + C_theta[nonzero] = ( + 1 - 1.5 * thetas[nonzero] / self.a + 0.5 * thetas[nonzero] ** 3 / self.a**3 + ) + + if not eval_gradient: + return self.lambda_s * C_theta + + deriv_a = np.zeros_like(thetas) + deriv_a[nonzero] = ( + 1.5 * self.lambda_s * (thetas[nonzero] / self.a**2 - thetas[nonzero] ** 3 / self.a**4) + ) + K_gradient = np.dstack((deriv_a, C_theta)) + + return self.lambda_s * C_theta, K_gradient + + def diag(self, X: np.ndarray) -> np.ndarray: + """Returns the diagonal of the kernel k(X, X). + + The result of this method is identical to np.diag(self(X)); however, + it can be evaluated more efficiently since only the diagonal is + evaluated. + + Parameters + ---------- + X : ndarray of shape (n_samples_X, n_features) + Left argument of the returned kernel k(X, Y) + + Returns + ------- + K_diag : ndarray of shape (n_samples_X,) + Diagonal of kernel k(X, X) + """ + return self.lambda_s * np.ones(X.shape[0]) + + def is_stationary(self) -> bool: + """Returns whether the kernel is stationary.""" + return True + + def __repr__(self) -> str: + return f"SphericalKriging (a={self.a}, λₛ={self.lambda_s})" + + +def compute_pairwise_angles( + X: np.ndarray, + Y: np.ndarray | None = None, + closest_polarity: bool = True, + dense_output: bool = True, +) -> np.ndarray: + r"""Compute pairwise angles across diffusion gradient encoding directions. + + Following [Andersson15]_, it computes the smallest of the angles between + each pair if ``closest_polarity`` is ``True``, i.e., + + .. math:: + + \theta(\mathbf{g}, \mathbf{g'}) = \arccos(|\langle \mathbf{g}, \mathbf{g'} \rangle|) + + Parameters + ---------- + X : {array-like, sparse matrix} of shape (n_samples_X, n_features) + Input data. + Y : {array-like, sparse matrix} of shape (n_samples_Y, n_features), \ + default=None + Input data. If ``None``, the output will be the pairwise + similarities between all samples in ``X``. + dense_output : :obj:`bool`, default=True + Whether to return dense output even when the input is sparse. If + ``False``, the output is sparse if both input arrays are sparse. + closest_polarity : :obj:`bool`, default=True + ``True`` to consider the smallest of the two angles between the crossing + lines resulting from reversing each vector pair. + + Returns + ------- + :obj:`~numpy.ndarray` + Pairwise angles across diffusion gradient encoding directions. + + Examples + -------- + >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T + >>> compute_pairwise_angles(X, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS + 3.1415... + >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T + >>> Y = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T + >>> compute_pairwise_angles(X, Y, closest_polarity=False)[0, 1] # doctest: +ELLIPSIS + 3.1415... + >>> X = np.asarray([(1.0, -1.0), (0.0, 0.0), (0.0, 0.0)]).T + >>> compute_pairwise_angles(X)[0, 1] + 0.0 + + References + ---------- + .. [Andersson15] J. L. R. Andersson. et al., An integrated approach to + correction for off-resonance effects and subject movement in diffusion MR + imaging, NeuroImage 125 (2016) 1063-11078 + + """ + + cosines = np.clip(cosine_similarity(X, Y, dense_output=dense_output), -1.0, 1.0) + return np.arccos(np.abs(cosines)) if closest_polarity else np.arccos(cosines)