From 92fbaba094ef4c87c21a65f0191502b5aeda3f09 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Wed, 23 Oct 2024 14:37:18 +0200 Subject: [PATCH] enh: write out all the results for potential off-line visualization Instead of making the script 1-off, let's keep the scores of all cross validation cycles. --- scripts/dwi_estimation_error_analysis.py | 223 +++-------------------- 1 file changed, 21 insertions(+), 202 deletions(-) diff --git a/scripts/dwi_estimation_error_analysis.py b/scripts/dwi_estimation_error_analysis.py index a35f77c0..b0031e13 100644 --- a/scripts/dwi_estimation_error_analysis.py +++ b/scripts/dwi_estimation_error_analysis.py @@ -29,13 +29,14 @@ from __future__ import annotations import argparse +from collections import defaultdict # import nibabel as nib import numpy as np +import pandas as pd from dipy.core.gradients import gradient_table from dipy.sims.voxel import single_tensor -from sklearn.metrics import root_mean_squared_error -from sklearn.model_selection import KFold, RepeatedKFold, cross_val_score +from sklearn.model_selection import RepeatedKFold, cross_val_score from eddymotion.model._sklearn import ( EddyMotionGPR, @@ -44,144 +45,6 @@ from eddymotion.testing import simulations as testsims -def create_random_train_mask(gtab: gradient_table, size: int, seed: int = 1234) -> np.ndarray: - """ - Create a random mask for the gradient table. - - Create a mask for the gradient table where a ``size`` number of indices will be - excluded. b0 values are excluded. - - Parameters - ---------- - gtab : :obj:`~dipy.core.gradients.gradient_table` - Gradient table. - size : :obj:`int` - Number of indices to exclude. - seed : :obj:`int`, optional - Seed for the random number generator, by default 1234. - - Returns - ------- - :obj:`~numpy.ndarray` - Training mask. - - Raises - ------ - ValueError - If the size of requested masked values is greater than available gradient vectors. - - """ - rng = np.random.default_rng(seed) - - # Get the indices of the non-zero diffusion-encoding gradient vector indices - nnzero_degv_idx = np.where(~gtab.b0s_mask)[0] - if nnzero_degv_idx.size < size: - raise ValueError( - f"Requested {size} values for masking; gradient table has {nnzero_degv_idx.size} " - "non-zero diffusion-encoding gradient vectors. Reduce the number of masked values." - ) - lo = rng.choice(nnzero_degv_idx, size=size, replace=False) - - # Exclude the b0s - zero_degv_idx = np.asarray(list(set(range(len(gtab.bvals))).difference(nnzero_degv_idx))) - lo = np.hstack([zero_degv_idx, lo]) - train_mask = np.ones(len(gtab.bvals), dtype=bool) - train_mask[lo] = False - return train_mask - - -def perform_experiment( - gtab: gradient_table, - S0: float, - evals1: np.ndarray, - evecs: np.ndarray, - snr: float, - repeats: int, - kfold: list[int], -) -> dict[int, list[tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]]: - """ - Perform the experiment by estimating the dMRI signal using a Gaussian process model. - - Parameters - ---------- - gtab : :obj:`~dipy.core.gradients.gradient_table` - Gradient table. - S0 : :obj:`float` - S0 value. - evals1 : :obj:`~numpy.ndarray` - Eigenvalues of the tensor. - evecs : :obj:`~numpy.ndarray` - Eigenvectors of the tensor. - snr : :obj:`float` - Signal-to-noise ratio. - repeats : :obj:`int` - Number of repeats. - kfold : :obj:`list` - list of directions to leave out/predict. - - Returns - ------- - :obj:`dict` - Data for the predicted signal and its error. - - """ - - # Fix the random number generator for reproducibility when generating the - # signal - rng = np.random.default_rng(1234) - - # Define the Gaussian process model parameter - lambda_s = 2.0 - a = 1.0 - sigma_sq = 0.5 - - data = {} - nzero_bvecs = gtab.bvecs[~gtab.b0s_mask] - - # Simulate the fitting a number of times: every time the signal created will be a little - # different - # for _ in range(repeats): - # Create the DWI signal using a single tensor - signal = single_tensor(gtab, S0=S0, evals=evals1, evecs=evecs, snr=snr, rng=rng) - - import pdb - - pdb.set_trace() - - # Loop over the number of indices that are left out from the training/need to be predicted - for n in kfold: - # Assumptions: - # - Consecutive indices in the folds - # - A single b0 - kf = KFold(n_splits=n, shuffle=False) - - # Define the Gaussian process model instance - gp_model = EddyMotionGPR( - kernel=SphericalKriging(a=a, lambda_s=lambda_s), - alpha=sigma_sq, - optimizer=None, - ) - - _data = [] - for _, (train_index, test_index) in enumerate(kf.split(nzero_bvecs)): - # Create the training mask leaving out the requested number of samples - # train_mask = create_random_train_mask(gtab, n) - - # Fit the Gaussian process - # Add 1 to account for the b0 - gpfit = gp_model.fit(signal[train_index + 1], gtab[train_index + 1]) - - # Predict the signal - # X_qry, idx_qry = get_query_vectors(gtab, train_mask) - # Add 1 to account for the b0 - idx_qry = test_index + 1 - X_qry = gtab[idx_qry].bvecs - _y_pred, _y_std = gpfit.predict(X_qry) - _data.append((idx_qry, signal[idx_qry], _y_pred, _y_std)) - data.update({n: _data}) - return data - - def cross_validate( gtab: gradient_table, S0: float, @@ -230,52 +93,6 @@ def cross_validate( return scores -def compute_error( - data: dict[int, list[tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]], - repeats: int, - kfolds: list[int], -) -> tuple[np.ndarray, np.ndarray]: - """ - Compute the error and standard deviation. - - Parameters - ---------- - data : :obj:`dict` - Data for the predicted signal and its error. - repeats : :obj:`int` - Number of repeats. - kfolds : :obj:`dict` - Number of k-folds. - - Returns - ------- - :obj:`tuple` - Mean RMSE and standard deviation. - - """ - mean_rmse = [] - std_dev = [] - - # Loop over the range of indices that were predicted - for vals in data.values(): - # repeats = 1 - # _data = np.array(vals[n * repeats : n * repeats + repeats]) - _signal = np.hstack([t[1] for t in vals]) - _pred = np.hstack([t[2] for t in vals]) - _rmse = root_mean_squared_error(_signal, _pred) - # ToDo - # Check here what is the value that we wil keep for the std - _std = np.hstack([t[3] for t in vals]) - _std_dev = np.mean(_std) - _std_dev = np.std( - [root_mean_squared_error([v1], [v2]) for v1, v2 in zip(_signal, _pred, strict=False)] - ) # np.std(_rmse) - mean_rmse.append(_rmse) - std_dev.append(_std_dev) - - return np.asarray(mean_rmse), np.asarray(std_dev) - - def _build_arg_parser() -> argparse.ArgumentParser: """ Build argument parser for command-line interface. @@ -333,22 +150,24 @@ def main() -> None: # Create a gradient table for a single-shell gtab = testsims.create_single_shell_gradient_table(args.hsph_dirs, args.bval_shell) - # Estimate the dMRI signal using a Gaussian process estimator - # ToDo - # Returning the index here is not useful as we cannot plot the signal as it - # changes on every fold and we do not return it. Maybe we can set a random - # value so that we return that one and we can plot it much like in the - # notebook or maybe we leave that for a separate script/notebook ?? - scores = { - n: cross_validate(gtab, args.S0, args.evals1, evecs, args.snr, n) - for n in args.kfold - for _ in range(args.repeats) - } - - print({n: (np.mean(scores[n]), np.std(scores[n])) for n in args.kfold}) - - # Compute the error - # rmse, std_dev = compute_error(data, args.repeats, args.kfold) + # Use Scikit-learn cross validation + scores = defaultdict(list, {}) + + for n in args.kfold: + for i in range(args.repeats): + cv_scores = -1.0 * cross_validate(gtab, args.S0, args.evals1, evecs, args.snr, n) + scores["rmse"] += cv_scores.tolist() + scores["repeat"] += [i] * len(cv_scores) + scores["n_folds"] += [n] * len(cv_scores) + + print(f"Finished {n}-fold cross-validation") + + scores_df = pd.DataFrame(scores) + scores_df.to_csv("cv_scores.tsv", sep="\t", index=None, na_rep="n/a") + + grouped = scores_df.groupby(["n_folds"]) + print(grouped[["rmse"]].mean()) + print(grouped[["rmse"]].std()) # Plot - import plot_* from eddymotion.viz.signals # xlabel = "N"