From f863c4bcb25439d36c1c4502bfa9a4bdeeb89e9a Mon Sep 17 00:00:00 2001 From: Jordan Deklerk <111652310+jordandeklerk@users.noreply.github.com> Date: Wed, 23 Oct 2024 22:50:28 -0400 Subject: [PATCH] Make estimators scikit-learn compatible and create new tests --- src/blockridge.py | 820 ++++++++++++++++----------------- test/test_blockridge.py | 294 ++++++------ test/test_blockridge_scikit.py | 344 ++++++++++++++ 3 files changed, 882 insertions(+), 576 deletions(-) create mode 100644 test/test_blockridge_scikit.py diff --git a/src/blockridge.py b/src/blockridge.py index 29da697..1356c6c 100644 --- a/src/blockridge.py +++ b/src/blockridge.py @@ -1,9 +1,11 @@ -"""Regression estimators based on group ridge regression.""" +"""Group Ridge regression estimators.""" from abc import ABC, abstractmethod -from typing import Union, TypeVar +from typing import Union, TypeVar, Dict, Any import numpy as np from scipy.linalg import cho_solve +from sklearn.base import BaseEstimator, RegressorMixin +from sklearn.utils.validation import check_X_y, check_array from groupedfeatures import GroupedFeatures from nnls import nonneg_lsq, NNLSError import warnings @@ -13,29 +15,25 @@ class RidgeRegressionError(Exception): """Base exception class for Ridge regression errors.""" - pass class InvalidDimensionsError(RidgeRegressionError): """Exception raised when matrix dimensions are incompatible.""" - pass class SingularMatrixError(RidgeRegressionError): """Exception raised when a matrix is singular or nearly singular.""" - pass class NumericalInstabilityError(RidgeRegressionError): """Exception raised when numerical instability is detected.""" - pass -class AbstractRidgePredictor(ABC): +class BaseRidgePredictor(ABC): r"""Abstract base class for Ridge regression predictors. This class defines the interface that all concrete Ridge predictors must @@ -46,30 +44,30 @@ class AbstractRidgePredictor(ABC): Ridge regression seeks to minimize the following objective function: .. math:: - \min_{\beta} \|Y - X\beta\|_2^2 + \lambda\|\beta\|_2^2 + \min_{\beta} \|y - X\beta\|_2^2 + \alpha\|\beta\|_2^2 where: - - :math:`Y` is the response vector with dimensions :math:`(n, )`. - - :math:`X` is the design matrix with dimensions :math:`(n, p)`. - - :math:`\beta` is the coefficient vector with dimensions :math:`(p, )`. - - :math:`\lambda` is the regularization parameter. + - :math:`y` is the target vector with dimensions :math:`(n_{\text{samples}}, )`. + - :math:`X` is the design matrix with dimensions :math:`(n_{\text{samples}}, n_{\text{features}})`. + - :math:`\beta` is the coefficient vector with dimensions :math:`(n_{\text{features}}, )`. + - :math:`\alpha` is the regularization parameter. The solution to this optimization problem is given by: .. math:: - \beta = (X^T X + \lambda I_p)^{-1} X^T Y + \beta = (X^T X + \alpha I_p)^{-1} X^T y """ @abstractmethod - def update_lambda_s(self, groups: GroupedFeatures, lambdas: np.ndarray): - r"""Update the regularization parameters (:math:`\lambda`) based on feature groups. + def set_params(self, groups: GroupedFeatures, alpha: np.ndarray): + r"""Update the regularization parameters (:math:`\alpha`) based on feature groups. Parameters ---------- groups : GroupedFeatures The grouped features object defining feature groupings. - lambdas : np.ndarray - The new :math:`\lambda` values for each group. + alpha : np.ndarray + The new :math:`\alpha` values for each group. Returns ------- @@ -78,19 +76,19 @@ def update_lambda_s(self, groups: GroupedFeatures, lambdas: np.ndarray): pass @abstractmethod - def trace_XtX(self) -> float: - """Compute the trace of :math:`X^T X`. + def _trace_gram_matrix(self) -> float: + """Compute the trace of the Gram matrix :math:`X^T X`. Returns ------- float - The trace of :math:`X^T X`. + The trace of the Gram matrix. """ pass @abstractmethod - def XtXp_lambda_ldiv_XtX(self) -> np.ndarray: - r"""Compute :math:`(X^T X + \Lambda)^{-1} X^T X`. + def _solve_gram_system(self) -> np.ndarray: + r"""Compute :math:`(X^T X + \alpha I)^{-1} X^T X`. Returns ------- @@ -100,8 +98,8 @@ def XtXp_lambda_ldiv_XtX(self) -> np.ndarray: pass @abstractmethod - def ldiv(self, B: np.ndarray) -> np.ndarray: - r"""Solve the linear system :math:`(X^T X + \Lambda) x = B`. + def _solve_system(self, B: np.ndarray) -> np.ndarray: + r"""Solve the linear system :math:`(X^T X + \alpha I) x = B`. Parameters ---------- @@ -116,26 +114,30 @@ def ldiv(self, B: np.ndarray) -> np.ndarray: pass -class CholeskyRidgePredictor(AbstractRidgePredictor): +class CholeskyRidgePredictor(BaseRidgePredictor): r"""Ridge predictor using Cholesky decomposition for efficient matrix inversion. - Suitable for scenarios where the number of features (:math:`p`) is less than the number - of samples (:math:`n`), leveraging the properties of Cholesky decomposition to solve - the Ridge regression problem efficiently. + Suitable for scenarios where the number of features is less than the number + of samples. + + Parameters + ---------- + X : np.ndarray of shape (n_samples, n_features) + Training data. Attributes ---------- - n : int + n_samples_ : int Number of samples. - p : int + n_features_ : int Number of features. - XtX : np.ndarray - The :math:`X^T X` matrix scaled by the number of samples. - XtXp_lambda : np.ndarray - The augmented matrix (:math:`X^T X + \Lambda`). - XtXp_lambda_chol : np.ndarray - The Cholesky decomposition of (:math:`X^T X + \Lambda`). - lower : bool + gram_matrix_ : np.ndarray of shape (n_features, n_features) + The Gram matrix :math:`X^T X` scaled by n_samples. + gram_reg_ : np.ndarray of shape (n_features, n_features) + The regularized Gram matrix :math:`X^T X + \alpha I`. + gram_reg_chol_ : np.ndarray of shape (n_features, n_features) + The Cholesky decomposition of gram_reg_. + lower_ : bool Indicates if the Cholesky factor is lower triangular. """ @@ -144,8 +146,8 @@ def __init__(self, X: np.ndarray): Parameters ---------- - X : np.ndarray - The design matrix. + X : np.ndarray of shape (n_samples, n_features) + Training data. Raises ------ @@ -159,14 +161,14 @@ def __init__(self, X: np.ndarray): if np.any(np.isnan(X)) or np.any(np.isinf(X)): raise ValueError("X contains NaN or infinity values.") - self.n, self.p = X.shape - self.XtX = np.dot(X.T, X) / self.n - self.XtXp_lambda = self.XtX + np.eye(self.p) # Initialize with identity matrix - self.update_cholesky() - self.lower = True + self.n_samples_, self.n_features_ = X.shape + self.gram_matrix_ = np.dot(X.T, X) / self.n_samples_ + self.gram_reg_ = self.gram_matrix_ + np.eye(self.n_features_) + self._update_cholesky() + self.lower_ = True - def update_cholesky(self): - r"""Update the Cholesky decomposition of (:math:`X^T X + \Lambda`). + def _update_cholesky(self): + r"""Update the Cholesky decomposition of the regularized Gram matrix. Raises ------ @@ -174,53 +176,53 @@ def update_cholesky(self): If the matrix is not positive definite. """ try: - self.XtXp_lambda_chol = np.linalg.cholesky(self.XtXp_lambda) + self.gram_reg_chol_ = np.linalg.cholesky(self.gram_reg_) except np.linalg.LinAlgError: raise SingularMatrixError( "Failed to compute Cholesky decomposition. Matrix may not be positive " "definite." ) - def update_lambda_s(self, groups: GroupedFeatures, lambdas: np.ndarray): + def set_params(self, groups: GroupedFeatures, alpha: np.ndarray): r"""Update the regularization parameters and recompute the Cholesky decomposition. Parameters ---------- groups : GroupedFeatures The grouped features object. - lambdas : np.ndarray - The new :math:`\lambda` values for each group. + alpha : np.ndarray + The new :math:`\alpha` values for each group. Returns ------- None """ - diag = groups.group_expand(lambdas) - self.XtXp_lambda = self.XtX + np.diag(diag) - self.update_cholesky() + diag = groups.group_expand(alpha) + self.gram_reg_ = self.gram_matrix_ + np.diag(diag) + self._update_cholesky() - def trace_XtX(self) -> float: - """Compute the trace of :math:`X^T X`. + def _trace_gram_matrix(self) -> float: + """Compute the trace of the Gram matrix. Returns ------- float - The trace of :math:`X^T X`. + The trace of the Gram matrix. """ - return np.trace(self.XtX) + return np.trace(self.gram_matrix_) - def XtXp_lambda_ldiv_XtX(self) -> np.ndarray: - r"""Compute :math:`(X^T X + \Lambda)^{-1} X^T X` using Cholesky decomposition. + def _solve_gram_system(self) -> np.ndarray: + r"""Compute :math:`(X^T X + \alpha I)^{-1} X^T X` using Cholesky decomposition. Returns ------- np.ndarray The result of the computation. """ - return cho_solve((self.XtXp_lambda_chol, self.lower), self.XtX) + return cho_solve((self.gram_reg_chol_, self.lower_), self.gram_matrix_) - def ldiv(self, B: np.ndarray) -> np.ndarray: - r"""Solve the system :math:`(X^T X + \Lambda) x = B` using Cholesky decomposition. + def _solve_system(self, B: np.ndarray) -> np.ndarray: + r"""Solve the system :math:`(X^T X + \alpha I) x = B` using Cholesky decomposition. Parameters ---------- @@ -232,15 +234,14 @@ def ldiv(self, B: np.ndarray) -> np.ndarray: np.ndarray The solution vector :math:`x`. """ - return cho_solve((self.XtXp_lambda_chol, self.lower), B) + return cho_solve((self.gram_reg_chol_, self.lower_), B) -class WoodburyRidgePredictor(AbstractRidgePredictor): +class WoodburyRidgePredictor(BaseRidgePredictor): r"""Ridge predictor using the Woodbury matrix identity for efficient matrix inversion. - This class is suitable for scenarios where the number of features (:math:`p`) is greater - than the number of samples (:math:`n`), leveraging the Woodbury matrix identity to solve - the Ridge regression problem efficiently. + This class is suitable for scenarios where the number of features is greater + than the number of samples. The Woodbury matrix identity states: @@ -248,26 +249,31 @@ class WoodburyRidgePredictor(AbstractRidgePredictor): (A + UCV)^{-1} = A^{-1} - A^{-1}U(C^{-1} + V A^{-1} U)^{-1} V A^{-1} In the context of Ridge regression: - :math:`A = \Lambda` (diagonal matrix of regularization parameters) + :math:`A = \alpha I` (diagonal matrix of regularization parameters) :math:`U = X^T` :math:`C = I` (identity matrix) :math:`V = X` + Parameters + ---------- + X : np.ndarray of shape (n_samples, n_features) + Training data. + Attributes ---------- - n : int + n_samples_ : int Number of samples. - p : int + n_features_ : int Number of features. - X : np.ndarray - The design matrix. - XtX : np.ndarray - The :math:`X^T X` matrix. - A_inv : np.ndarray - The inverse of :math:`\Lambda`. - U : np.ndarray + X_ : np.ndarray of shape (n_samples, n_features) + The training data. + gram_matrix_ : np.ndarray of shape (n_features, n_features) + The Gram matrix :math:`X^T X`. + alpha_inv_ : np.ndarray of shape (n_features, n_features) + The inverse of :math:`\alpha I`. + U_ : np.ndarray of shape (n_features, n_samples) Matrix :math:`U` in the Woodbury identity. - V : np.ndarray + V_ : np.ndarray of shape (n_samples, n_features) Matrix :math:`V` in the Woodbury identity. """ @@ -276,8 +282,8 @@ def __init__(self, X: np.ndarray): Parameters ---------- - X : np.ndarray - The design matrix. + X : np.ndarray of shape (n_samples, n_features) + Training data. Raises ------ @@ -291,40 +297,40 @@ def __init__(self, X: np.ndarray): if np.any(np.isnan(X)) or np.any(np.isinf(X)): raise ValueError("X contains NaN or infinity values.") - self.n, self.p = X.shape - self.X = X - self.XtX = X.T @ X - self.A_inv = np.eye(self.p) - self.U = X.T - self.V = X + self.n_samples_, self.n_features_ = X.shape + self.X_ = X + self.gram_matrix_ = X.T @ X + self.alpha_inv_ = np.eye(self.n_features_) + self.U_ = X.T + self.V_ = X - def update_lambda_s(self, groups: GroupedFeatures, lambdas: np.ndarray): + def set_params(self, groups: GroupedFeatures, alpha: np.ndarray): r"""Update the regularization parameters and recompute the inverse matrix. Parameters ---------- groups : GroupedFeatures The grouped features object. - lambdas : np.ndarray - The new :math:`\lambda` values for each group. + alpha : np.ndarray + The new :math:`\alpha` values for each group. Raises ------ ValueError - If lambdas contain non-positive values. + If alpha contains non-positive values. Returns ------- None """ - if np.any(lambdas <= 0): - raise ValueError("Lambda values must be positive.") + if np.any(alpha <= 0): + raise ValueError("Alpha values must be positive.") - diag = np.array(groups.group_expand(lambdas)) - self.A_inv = np.diag(1 / diag) - self.woodbury_update() + diag = np.array(groups.group_expand(alpha)) + self.alpha_inv_ = np.diag(1 / diag) + self._woodbury_update() - def woodbury_update(self): + def _woodbury_update(self): """Apply the Woodbury matrix identity to update the inverse matrix. Raises @@ -333,37 +339,37 @@ def woodbury_update(self): If numerical instability is detected during the update. """ try: - eye = np.eye(self.n) - AU = self.A_inv @ self.U - inv_term = np.linalg.inv(eye + self.V @ AU) - self.A_inv -= AU @ inv_term @ self.V @ self.A_inv + eye = np.eye(self.n_samples_) + AU = self.alpha_inv_ @ self.U_ + inv_term = np.linalg.inv(eye + self.V_ @ AU) + self.alpha_inv_ -= AU @ inv_term @ self.V_ @ self.alpha_inv_ except np.linalg.LinAlgError: raise NumericalInstabilityError( "Numerical instability detected in Woodbury update." ) - def trace_XtX(self) -> float: - """Compute the trace of :math:`X^T X`. + def _trace_gram_matrix(self) -> float: + """Compute the trace of the Gram matrix. Returns ------- float - The trace of :math:`X^T X`. + The trace of the Gram matrix. """ - return np.trace(self.XtX) + return np.trace(self.gram_matrix_) - def XtXp_lambda_ldiv_XtX(self) -> np.ndarray: - r"""Compute :math:`(X^T X + \Lambda)^{-1} X^T X`. + def _solve_gram_system(self) -> np.ndarray: + r"""Compute :math:`(X^T X + \alpha I)^{-1} X^T X`. Returns ------- np.ndarray The result of the computation. """ - return self.A_inv @ self.XtX + return self.alpha_inv_ @ self.gram_matrix_ - def ldiv(self, B: np.ndarray) -> np.ndarray: - r"""Solve the system :math:`(X^T X + \Lambda) x = B`. + def _solve_system(self, B: np.ndarray) -> np.ndarray: + r"""Solve the system :math:`(X^T X + \alpha I) x = B`. Parameters ---------- @@ -375,15 +381,14 @@ def ldiv(self, B: np.ndarray) -> np.ndarray: np.ndarray The solution vector :math:`x`. """ - return self.A_inv @ B + return self.alpha_inv_ @ B -class ShermanMorrisonRidgePredictor(AbstractRidgePredictor): +class ShermanMorrisonRidgePredictor(BaseRidgePredictor): r"""Ridge predictor using the Sherman-Morrison formula for efficient matrix updates. - This class is suitable for scenarios where the number of features (:math:`p`) is much - greater than the number of samples (:math:`n`), leveraging the Sherman-Morrison formula - to efficiently update the inverse of (:math:`X^T X + \Lambda`) as :math:`\Lambda` changes. + This class is suitable for scenarios where the number of features is much + greater than the number of samples. The Sherman-Morrison formula states: @@ -392,23 +397,28 @@ class ShermanMorrisonRidgePredictor(AbstractRidgePredictor): where :math:`A` is the current inverse, and :math:`uv^T` represents a rank-one update. + Parameters + ---------- + X : np.ndarray of shape (n_samples, n_features) + Training data. + Attributes ---------- - n : int + n_samples_ : int Number of samples. - p : int + n_features_ : int Number of features. - X : np.ndarray - The design matrix. - XtX : np.ndarray - The regularized :math:`X^T X` matrix. - A : np.ndarray - The matrix (:math:`I + X^T X`). - A_inv : np.ndarray + X_ : np.ndarray of shape (n_samples, n_features) + The training data. + gram_matrix_ : np.ndarray of shape (n_features, n_features) + The Gram matrix :math:`X^T X`. + A_ : np.ndarray of shape (n_features, n_features) + The matrix :math:`I + X^T X`. + A_inv_ : np.ndarray of shape (n_features, n_features) The inverse of matrix :math:`A`. - U : np.ndarray + U_ : np.ndarray of shape (n_features, n_samples) Matrix :math:`U` used for efficiency in updates. - V : np.ndarray + V_ : np.ndarray of shape (n_samples, n_features) Matrix :math:`V` used for efficiency in updates. """ @@ -417,8 +427,8 @@ def __init__(self, X: np.ndarray): Parameters ---------- - X : np.ndarray - The design matrix. + X : np.ndarray of shape (n_samples, n_features) + Training data. Raises ------ @@ -432,45 +442,45 @@ def __init__(self, X: np.ndarray): if np.any(np.isnan(X)) or np.any(np.isinf(X)): raise ValueError("X contains NaN or infinity values.") - self.n, self.p = X.shape - self.X = X - self.XtX = self.X.T @ self.X / self.n - self.A = np.eye(self.p) + self.XtX # A = I + XtX - self.A_inv = np.linalg.inv(self.A) # Initialize A_inv as inv(A) - self.U = self.X.T / np.sqrt(self.n) # Precompute for efficiency - self.V = self.X / np.sqrt(self.n) + self.n_samples_, self.n_features_ = X.shape + self.X_ = X + self.gram_matrix_ = self.X_.T @ X / self.n_samples_ + self.A_ = np.eye(self.n_features_) + self.gram_matrix_ + self.A_inv_ = np.linalg.inv(self.A_) + self.U_ = self.X_.T / np.sqrt(self.n_samples_) + self.V_ = self.X_ / np.sqrt(self.n_samples_) - def update_lambda_s(self, groups: GroupedFeatures, lambdas: np.ndarray): - r"""Update the regularization parameters (:math:`\lambda`) and adjust A_inv accordingly. + def set_params(self, groups: GroupedFeatures, alpha: np.ndarray): + r"""Update the regularization parameters and adjust A_inv_ accordingly. Parameters ---------- groups : GroupedFeatures The grouped features object. - lambdas : np.ndarray - The new :math:`\lambda` values for each group. + alpha : np.ndarray + The new :math:`\alpha` values for each group. Raises ------ ValueError - If lambdas contain negative values. + If alpha contains negative values. Returns ------- None """ - if np.any(lambdas < 0): - raise ValueError("Lambda values must be non-negative.") + if np.any(alpha < 0): + raise ValueError("Alpha values must be non-negative.") - diag = groups.group_expand(lambdas) - self.A = np.diag(diag) + self.XtX # Update A with new Lambda + diag = groups.group_expand(alpha) + self.A_ = np.diag(diag) + self.gram_matrix_ try: - self.A_inv = np.linalg.inv(self.A) # Recompute A_inv + self.A_inv_ = np.linalg.inv(self.A_) except np.linalg.LinAlgError: raise SingularMatrixError("Failed to invert A. Matrix may be singular.") - def sherman_morrison(self, u: np.ndarray, v: np.ndarray): - """Apply the Sherman-Morrison formula to update A_inv with a rank-one update. + def _sherman_morrison_update(self, u: np.ndarray, v: np.ndarray): + """Apply the Sherman-Morrison formula to update A_inv_ with a rank-one update. Parameters ---------- @@ -488,37 +498,37 @@ def sherman_morrison(self, u: np.ndarray, v: np.ndarray): ------- None """ - Au = self.A_inv @ u - vA = v @ self.A_inv + Au = self.A_inv_ @ u + vA = v @ self.A_inv_ denominator = 1.0 + v @ Au if abs(denominator) < 1e-10: raise NumericalInstabilityError( "Denominator in Sherman-Morrison update is close to zero." ) - self.A_inv -= np.outer(Au, vA) / denominator + self.A_inv_ -= np.outer(Au, vA) / denominator - def trace_XtX(self) -> float: - r"""Compute the trace of :math:`X^T X`. + def _trace_gram_matrix(self) -> float: + r"""Compute the trace of the Gram matrix. Returns ------- float - The trace of :math:`X^T X`. + The trace of the Gram matrix. """ - return np.trace(self.XtX) + return np.trace(self.gram_matrix_) - def XtXp_lambda_ldiv_XtX(self) -> np.ndarray: - r"""Compute :math:`(X^T X + \Lambda)^{-1} X^T X`. + def _solve_gram_system(self) -> np.ndarray: + r"""Compute :math:`(X^T X + \alpha I)^{-1} X^T X`. Returns ------- np.ndarray The result of the computation. """ - return self.ldiv(self.XtX) + return self._solve_system(self.gram_matrix_) - def ldiv(self, B: np.ndarray) -> np.ndarray: - r"""Solve the system :math:`(X^T X + \Lambda)^{-1} B` using the precomputed A_inv. + def _solve_system(self, B: np.ndarray) -> np.ndarray: + r"""Solve the system :math:`(X^T X + \alpha I)^{-1} B` using the precomputed A_inv_. Parameters ---------- @@ -531,15 +541,15 @@ def ldiv(self, B: np.ndarray) -> np.ndarray: The solution vector :math:`x`. """ if B.ndim == 1: - return self.A_inv @ B + return self.A_inv_ @ B else: - return self.A_inv @ B + return self.A_inv_ @ B @staticmethod - def sherman_morrison_formula( + def _sherman_morrison_formula( A: np.ndarray, u: np.ndarray, v: np.ndarray ) -> np.ndarray: - """Apply the Sherman-Morrison formula to matrix :math:`A` with vectors :math:`u` and :math:`v`. + """Apply the Sherman-Morrison formula to matrix A with vectors u and v. Parameters ---------- @@ -568,182 +578,178 @@ def sherman_morrison_formula( return A - np.outer(Au, vA) / denominator -class BasicGroupRidgeWorkspace: - r"""Workspace for performing group Ridge regression, including fitting and prediction. +class GroupRidgeRegressor(BaseEstimator, RegressorMixin): + r"""Ridge regression with grouped features. - This class manages the entire workflow of group-wise Ridge regression, handling - the fitting process, parameter updates, predictions, and evaluation metrics. It - leverages Ridge predictors (`CholeskyRidgePredictor`, `WoodburyRidgePredictor`, - `ShermanMorrisonRidgePredictor`) based on the dimensionality of the data to ensure - computational efficiency. + This class implements Ridge regression with feature groups, automatically selecting + the most efficient solver based on the problem dimensions. It supports Cholesky + decomposition for n_samples > n_features, Woodbury identity for moderate-sized + problems, and Sherman-Morrison updates for very high-dimensional problems. - Attributes + Parameters ---------- - X : np.ndarray - The design matrix. - Y : np.ndarray - The target vector. groups : GroupedFeatures The grouped features object. - n : int + alpha : Union[np.ndarray, dict], default=None + The regularization parameters. Can be a numpy array or a dictionary + mapping group names to :math:`\alpha` values. If None, uses ones. + + Attributes + ---------- + n_features_in_ : int + Number of features seen during fit. + feature_names_in_ : ndarray of shape (n_features_in_,) + Names of features seen during fit. + n_samples_ : int Number of samples. - p : int + n_features_ : int Number of features. - predictor : AbstractRidgePredictor + groups_ : GroupedFeatures + The grouped features object. + predictor_ : BaseRidgePredictor The Ridge predictor being used. - XtY : np.ndarray - The :math:`X^T Y` matrix scaled by the number of samples. - lambdas : np.ndarray + gram_target_ : np.ndarray of shape (n_features,) + The :math:`X^T y` matrix scaled by n_samples. + alpha_ : np.ndarray of shape (n_groups,) Regularization parameters for each group. - beta_current : np.ndarray - Current coefficient estimates. - Y_hat : np.ndarray - Predicted values. - XtXp_lambda_inv : np.ndarray - Inverse of (:math:`X^T X + \Lambda`). - moment_setup : MomentTunerSetup - Moment setup for parameter tuning. - leverage_store : np.ndarray + coef_ : np.ndarray of shape (n_features,) + Coefficient vector. + leverage_ : np.ndarray of shape (n_samples,) Leverage scores for each sample. + + Notes + ----- + The solver selection is based on the following rules: + - If n_features <= n_samples: Use Cholesky decomposition + - If n_samples < n_features < 4 * n_samples: Use Woodbury identity + - If n_features >= 4 * n_samples: Use Sherman-Morrison updates """ - def __init__(self, X: np.ndarray, Y: np.ndarray, groups: GroupedFeatures): - """Initialize the BasicGroupRidgeWorkspace. + def __init__(self, groups: GroupedFeatures, alpha: Union[np.ndarray, dict] = None): + self.groups = groups + self.alpha = alpha - Parameters - ---------- - X : np.ndarray - The design matrix. - Y : np.ndarray - The target vector. - groups : GroupedFeatures - The grouped features object. + def _validate_params(self): + """Validate parameters. Raises ------ ValueError - If `groups` does not contain any groups or if any group has zero size. + If groups does not contain any groups or if any group has zero size. """ - if not groups.ps: + if not self.groups.ps: raise ValueError("GroupedFeatures must contain at least one group.") - if any(p == 0 for p in groups.ps): + if any(p == 0 for p in self.groups.ps): raise ValueError("GroupedFeatures groups must have non-zero sizes.") - self.X = X - self.Y = Y - self.groups = groups - self.n, self.p = X.shape - # Initialize predictor based on p and n - if self.p <= self.n: - self.predictor = CholeskyRidgePredictor(X) - elif self.p > self.n and self.p < 4 * self.n: - self.predictor = WoodburyRidgePredictor(X) - else: - self.predictor = ShermanMorrisonRidgePredictor(X) - - self.XtY = np.dot(X.T, Y) / self.n - self.lambdas = np.ones(groups.num_groups) - self.update_lambda_s(self.lambdas) - self.beta_current = self.predictor.ldiv(self.XtY) - self.Y_hat = np.dot(X, self.beta_current) - - # Compute (:math:`X^T X + \Lambda`)^{-1} - self.XtXp_lambda_inv = self.predictor.ldiv(np.eye(self.p)) - - self.moment_setup = MomentTunerSetup(self) - - def update_lambda_s(self, lambdas: np.ndarray): - r"""Update the regularization parameters (:math:`\lambda`). + def fit(self, X: np.ndarray, y: np.ndarray): + """Fit the Ridge regression model. Parameters ---------- - lambdas : np.ndarray - New :math:`\lambda` values for each group. + X : np.ndarray of shape (n_samples, n_features) + Training data. + y : np.ndarray of shape (n_samples,) + Target values. Returns ------- - None + self : object + Returns self. """ - self.lambdas = lambdas - self.predictor.update_lambda_s(self.groups, self.lambdas) - self.XtXp_lambda_div_Xt = self.predictor.ldiv(self.X.T) + self._validate_params() + X, y = check_X_y(X, y, accept_sparse=False) + self.y_ = y # Store y for later use + + self.n_features_in_ = X.shape[1] + self.n_samples_, self.n_features_ = X.shape + self.groups_ = self.groups + + # Initialize predictor based on problem dimensions + if self.n_features_ <= self.n_samples_: + self.predictor_ = CholeskyRidgePredictor(X) + elif self.n_features_ > self.n_samples_ and self.n_features_ < 4 * self.n_samples_: + self.predictor_ = WoodburyRidgePredictor(X) + else: + self.predictor_ = ShermanMorrisonRidgePredictor(X) - def ngroups(self) -> int: - """Get the number of feature groups. + self.gram_target_ = np.dot(X.T, y) / self.n_samples_ - Returns - ------- - int - Number of groups. - """ - return self.groups.ngroups() + # Set alpha values + if self.alpha is None: + self.alpha_ = np.ones(self.groups.num_groups) + elif isinstance(self.alpha, dict): + self.alpha_ = np.array(list(self.alpha.values())) + else: + self.alpha_ = np.asarray(self.alpha) - def coef(self) -> np.ndarray: - """Get the current coefficient estimates (:math:`\beta`). + self.predictor_.set_params(self.groups_, self.alpha_) + self.coef_ = self.predictor_._solve_system(self.gram_target_) + self.y_pred_ = np.dot(X, self.coef_) - Returns - ------- - np.ndarray - Coefficient vector :math:`\beta`. - """ - return self.beta_current + # Store gram_reg_inv_ for MomentTunerSetup + self.gram_reg_inv_ = self.predictor_._solve_system(np.eye(self.n_features_)) - def islinear(self) -> bool: - """Check if the model is linear. - - Returns - ------- - bool - Always returns True for Ridge regression. - """ - return True + # Compute leverage scores + self.gram_reg_inv_X_ = ( + self.predictor_._solve_system(X.T).T / self.n_samples_ + ) + self.leverage_ = np.sum(X * self.gram_reg_inv_X_, axis=1) - def leverage(self) -> np.ndarray: - """Get the leverage scores (:math:`h_i`). + return self - Returns - ------- - np.ndarray - Leverage scores vector. - """ - return self.leverage_store + def predict(self, X: np.ndarray) -> np.ndarray: + """Predict using the Ridge regression model. - def modelmatrix(self) -> np.ndarray: - """Get the design matrix (:math:`X`). + Parameters + ---------- + X : np.ndarray of shape (n_samples_new, n_features) + New data. Returns ------- - np.ndarray - Design matrix :math:`X`. + np.ndarray of shape (n_samples_new,) + Predicted values. """ - return self.X + X = check_array(X, accept_sparse=False) + return np.dot(X, self.coef_) - def predict(self, X_new: np.ndarray) -> np.ndarray: - """Make predictions for new data. + def get_params(self, deep: bool = True) -> Dict[str, Any]: + """Get parameters for this estimator. Parameters ---------- - X_new : np.ndarray - New design matrix. + deep : bool, default=True + If True, will return the parameters for this estimator and + contained subobjects that are estimators. Returns ------- - np.ndarray - Predicted values. + params : Dict[str, Any] + Parameter names mapped to their values. """ - return np.dot(X_new, self.coef()) + return {"groups": self.groups, "alpha": self.alpha} + + def set_params(self, **params) -> "GroupRidgeRegressor": + """Set the parameters of this estimator. - def response(self) -> np.ndarray: - """Get the response variable (:math:`Y`). + Parameters + ---------- + **params : dict + Estimator parameters. Returns ------- - np.ndarray - Response vector :math:`Y`. + self : object + Estimator instance. """ - return self.Y + if "groups" in params: + self.groups = params["groups"] + if "alpha" in params: + self.alpha = params["alpha"] + return self - def loo_error(self) -> float: + def get_loo_error(self) -> float: """Compute the leave-one-out (LOO) error. Returns @@ -751,16 +757,16 @@ def loo_error(self) -> float: float The computed LOO error. """ - return np.mean((self.Y - self.Y_hat) ** 2 / (1.0 - self.leverage_store) ** 2) + return np.mean((self.y_ - self.y_pred_) ** 2 / (1.0 - self.leverage_) ** 2) - def mse_ridge(self, X_test: np.ndarray, Y_test: np.ndarray) -> float: + def get_mse(self, X_test: np.ndarray, y_test: np.ndarray) -> float: """Compute the mean squared error (MSE) on test data. Parameters ---------- - X_test : np.ndarray + X_test : np.ndarray of shape (n_samples_test, n_features) Test design matrix. - Y_test : np.ndarray + y_test : np.ndarray of shape (n_samples_test,) Test response vector. Returns @@ -768,89 +774,46 @@ def mse_ridge(self, X_test: np.ndarray, Y_test: np.ndarray) -> float: float The computed MSE. """ - return np.mean((Y_test - np.dot(X_test, self.beta_current)) ** 2) - - def fit(self, lambdas: Union[np.ndarray, dict]): - r"""Fit the Ridge regression model with given regularization parameters. - - Parameters - ---------- - lambdas : Union[np.ndarray, dict] - The regularization parameters. Can be a numpy array or a dictionary - mapping group names to :math:`\lambda` values. - - Returns - ------- - float - The leave-one-out (LOO) error after fitting. - - Raises - ------ - ValueError - If the provided lambdas are invalid. - """ - if isinstance(lambdas, dict): - lambdas = np.array(list(lambdas.values())) - self.lambdas = lambdas - self.predictor.update_lambda_s(self.groups, self.lambdas) - self.beta_current = self.predictor.ldiv(self.XtY) - self.Y_hat = np.dot(self.X, self.beta_current) - self.XtXp_lambda_div_Xt = self.predictor.ldiv(self.X.T).T / self.n - self.leverage_store = np.sum(self.X * self.XtXp_lambda_div_Xt, axis=1) - return self.loo_error() + X_test = check_array(X_test, accept_sparse=False) + return np.mean((y_test - self.predict(X_test)) ** 2) -def lambda_lolas_rule(rdg: BasicGroupRidgeWorkspace, multiplier: float = 0.1) -> float: - r"""Compute the regularization parameter :math:`\lambda` using the Panagiotis Lolas rule. +def lambda_lolas_rule(rdg: GroupRidgeRegressor, multiplier: float = 0.1) -> float: + r"""Compute the regularization parameter using the Panagiotis Lolas rule. The Lolas rule provides a heuristic for selecting the regularization parameter - based on the model's degrees of freedom and the trace of :math:`X^T X`. This method - balances the complexity of the model against its fit to the training data. + based on the model's degrees of freedom and the trace of :math:`X^T X`. - The :math:`\lambda` is computed as: + The :math:`\alpha` is computed as: .. math:: - \lambda = \text{multiplier} \times \frac{p^2}{n \times \text{trace}(X^T X)} - - where: - - multiplier : float, default=0.1 - A scalar multiplier for the rule. - - :math:`p` : int - Number of features. - - :math:`n` : int - Number of samples. - - :math:`\text{trace}(X^T X)` : float - The trace of the covariance matrix :math:`X^T X`. - - This formula scales the regularization parameter based on both the - dimensionality of the data and the trace of the covariance matrix, ensuring - that :math:`\lambda` is appropriately tuned for the given dataset. + \alpha = \text{multiplier} \times \frac{n_{\text{features}}^2}{n_{\text{samples}} \times \text{trace}(X^T X)} Parameters ---------- - rdg : BasicGroupRidgeWorkspace - The Ridge regression workspace containing model parameters. + rdg : GroupRidgeRegressor + The Ridge regression estimator. multiplier : float, default=0.1 A scalar multiplier for the rule. Returns ------- float - The computed :math:`\lambda` value. + The computed :math:`\alpha` value. Raises ------ ValueError - If multiplier is not positive or if :math:`\text{trace}(X^T X)` is zero. + If multiplier is not positive or if trace(X^T X) is zero. """ if multiplier <= 0: raise ValueError("Multiplier must be positive.") - trace_XtX = rdg.predictor.trace_XtX() - if trace_XtX == 0: + trace_gram = rdg.predictor_._trace_gram_matrix() + if trace_gram == 0: raise ValueError("Trace of X^T X is zero, leading to division by zero.") - return multiplier * rdg.p**2 / rdg.n / trace_XtX + return multiplier * rdg.n_features_**2 / rdg.n_samples_ / trace_gram class MomentTunerSetup: @@ -877,41 +840,30 @@ class MomentTunerSetup: :math:`M^2` matrix computed as (:math:`p_s \times p_s^T) / n^2`. """ - def __init__(self, rdg: BasicGroupRidgeWorkspace): - """ - Initialize the MomentTunerSetup. - - Parameters - ---------- - rdg : BasicGroupRidgeWorkspace - An instance of BasicGroupRidgeWorkspace containing the model's current - state. - - Raises - ------ - ValueError - If the length of N_matrix does not match the number of features. - """ - self.groups = rdg.groups - self.ps = np.array(rdg.groups.ps) - self.n = rdg.n - self.beta_norms_squared = np.array( - rdg.groups.group_summary(rdg.beta_current, lambda x: np.sum(np.abs(x) ** 2)) + def __init__(self, rdg: GroupRidgeRegressor): + self.groups_ = rdg.groups_ + self.n_features_per_group_ = np.array(rdg.groups_.ps) + self.n_samples_ = rdg.n_samples_ + self.coef_norms_squared_ = np.array( + rdg.groups_.group_summary(rdg.coef_, lambda x: np.sum(np.abs(x) ** 2)) ) - N_matrix = rdg.XtXp_lambda_inv # Use the (p, p) inverse matrix - if N_matrix.shape[1] != self.ps.sum(): + gram_inv_matrix = rdg.gram_reg_inv_ # Use the (p, p) inverse matrix + if gram_inv_matrix.shape[1] != self.n_features_per_group_.sum(): raise ValueError( - f"Length of N_matrix ({N_matrix.shape[1]}) does not match number of " - f"features ({self.ps.sum()})" + f"Length of gram_inv_matrix ({gram_inv_matrix.shape[1]}) does not match " + f"number of features ({self.n_features_per_group_.sum()})" ) - self.N_norms_squared = np.array( - rdg.groups.group_summary(N_matrix, lambda x: np.sum(np.abs(x) ** 2)) + self.gram_inv_norms_squared_ = np.array( + rdg.groups_.group_summary(gram_inv_matrix, lambda x: np.sum(np.abs(x) ** 2)) + ) + self.moment_matrix_ = ( + np.outer(self.n_features_per_group_, self.n_features_per_group_) + / self.n_samples_**2 ) - self.M_squared = np.outer(self.ps, self.ps) / self.n**2 def sigma_squared_path( - rdg: BasicGroupRidgeWorkspace, mom: MomentTunerSetup, sigma_s_squared: np.ndarray + rdg: GroupRidgeRegressor, mom: MomentTunerSetup, sigma_s_squared: np.ndarray ): r"""Compute the regularization path for different values of :math:`\sigma^2`. @@ -922,37 +874,37 @@ def sigma_squared_path( For each :math:`\sigma^2` in the provided array: - 1. Compute :math:`\lambda`: - :math:`\lambda = \text{get_lambdas}(mom, \sigma^2)` + 1. Compute :math:`\alpha`: + :math:`\alpha = \text{get_lambdas}(mom, \sigma^2)` 2. Fit Ridge Model: - Fit the Ridge regression model using the computed :math:`\lambda` and evaluate: - LOO Error = rdg.fit(:math:`\lambda`) + Fit the Ridge regression model using the computed :math:`\alpha` and evaluate: + LOO Error = rdg.fit(:math:`\alpha`) 3. Store Coefficients: - :math:`\beta = rdg.beta_current` + :math:`\beta = rdg.coef_` 4. Aggregate Results: - Collect the :math:`\lambda` values, LOO errors, and :math:`\beta` coefficients for each :math:`\sigma^2`. + Collect the :math:`\alpha` values, LOO errors, and coefficients for each :math:`\sigma^2`. Parameters ---------- - rdg : BasicGroupRidgeWorkspace - The Ridge regression workspace. + rdg : GroupRidgeRegressor + The Ridge regression estimator. mom : MomentTunerSetup The moment tuner setup containing necessary statistics. - sigma_s_squared : np.ndarray + sigma_s_squared : np.ndarray of shape (n_sigmas,) An array of :math:`\sigma^2` values to evaluate. Returns ------- dict A dictionary containing the regularization path information: - - 'lambdas' : np.ndarray - Array of :math:`\lambda` values for each :math:`\sigma^2`. - - 'loos' : np.ndarray + - 'alphas' : np.ndarray of shape (n_sigmas, n_groups) + Array of :math:`\alpha` values for each :math:`\sigma^2`. + - 'errors' : np.ndarray of shape (n_sigmas,) Array of LOO errors corresponding to each :math:`\sigma^2`. - - 'betas' : np.ndarray + - 'coefs' : np.ndarray of shape (n_sigmas, n_features) Array of coefficient vectors corresponding to each :math:`\sigma^2`. Raises @@ -963,29 +915,29 @@ def sigma_squared_path( if np.any(sigma_s_squared < 0): raise ValueError("sigma_s_squared values must be non-negative.") - n_sigma = len(sigma_s_squared) - n_groups = rdg.ngroups() - loos_hat = np.zeros(n_sigma) - lambdas = np.zeros((n_sigma, n_groups)) - betas = np.zeros((n_sigma, rdg.groups.p)) + n_sigmas = len(sigma_s_squared) + n_groups = rdg.get_n_groups() + errors = np.zeros(n_sigmas) + alphas = np.zeros((n_sigmas, n_groups)) + coefs = np.zeros((n_sigmas, rdg.groups_.p)) for i, sigma_sq in enumerate(sigma_s_squared): try: - lambdas_tmp = get_lambdas(mom, sigma_sq) - lambdas[i, :] = lambdas_tmp - loos_hat[i] = rdg.fit(lambdas_tmp) - betas[i, :] = rdg.beta_current + alpha_tmp = get_lambdas(mom, sigma_sq) + alphas[i, :] = alpha_tmp + errors[i] = rdg.fit(alpha_tmp) + coefs[i, :] = rdg.coef_ except RidgeRegressionError as e: print(f"Error at :math:`\sigma^2 = {sigma_sq}`: {str(e)}") - return {"lambdas": lambdas, "loos": loos_hat, "betas": betas} + return {"alphas": alphas, "errors": errors, "coefs": coefs} def get_lambdas(mom: MomentTunerSetup, sigma_sq: float) -> np.ndarray: - r"""Compute :math:`\lambda` values for a given :math:`\sigma^2`. + r"""Compute :math:`\alpha` values for a given :math:`\sigma^2`. - This function calculates the regularization parameters (:math:`\lambda`) for each - feature group based on moment-based statistics. The computed :math:`\lambda` balances + This function calculates the regularization parameters (:math:`\alpha`) for each + feature group based on moment-based statistics. The computed :math:`\alpha` balances the regularization strength across different groups, ensuring that groups with more features or higher variance receive appropriate penalization. @@ -994,58 +946,58 @@ def get_lambdas(mom: MomentTunerSetup, sigma_sq: float) -> np.ndarray: 1. Compute :math:`\alpha^2` for each group: :math:`\alpha_g^2 = \max(\|\beta_g\|^2 - \sigma^2 \|N_g\|^2, 0) / p_g` - 2. Compute :math:`\gamma_s` for each group: - :math:`\gamma_g = p_g / n` + 2. Compute group ratios: + :math:`r_g = p_g / n` - 3. Compute :math:`\lambda` for each group: - :math:`\lambda_g = (\sigma^2 \gamma_g) / \alpha_g^2` + 3. Compute :math:`\alpha` for each group: + :math:`\alpha_g = (\sigma^2 r_g) / \alpha_g^2` Parameters ---------- mom : MomentTunerSetup The moment tuner setup containing necessary statistics. sigma_sq : float - The :math:`\sigma^2` value for which to compute :math:`\lambda`. + The :math:`\sigma^2` value for which to compute :math:`\alpha`. Returns ------- - np.ndarray - The computed :math:`\lambda` values for each feature group. + np.ndarray of shape (n_groups,) + The computed :math:`\alpha` values for each feature group. Raises ------ ValueError If sigma_sq is negative. NumericalInstabilityError - If division by zero occurs during lambda computation. + If division by zero occurs during alpha computation. """ if sigma_sq < 0: raise ValueError("sigma_sq must be non-negative.") - alpha_sq = get_alpha_s_squared(mom, sigma_sq) - gamma_s = np.array(mom.ps) / mom.n + alpha_squared = get_alpha_s_squared(mom, sigma_sq) + group_ratios = np.array(mom.n_features_per_group_) / mom.n_samples_ LARGE_VALUE = 1e12 with np.errstate(divide="ignore", invalid="ignore"): - lambdas = sigma_sq * gamma_s / alpha_sq - zero_alpha = alpha_sq == 0 + alphas = sigma_sq * group_ratios / alpha_squared + zero_alpha = alpha_squared == 0 if np.any(zero_alpha): warnings.warn( - f"alpha_sq has zero values for groups: {np.where(zero_alpha)[0]}. " - "Assigning large lambda values to these groups." + f"alpha_squared has zero values for groups: {np.where(zero_alpha)[0]}. " + "Assigning large alpha values to these groups." ) - lambdas = np.where(zero_alpha, LARGE_VALUE, lambdas) + alphas = np.where(zero_alpha, LARGE_VALUE, alphas) - return lambdas + return alphas def get_alpha_s_squared(mom: MomentTunerSetup, sigma_sq: float) -> np.ndarray: - r"""Compute :math:`\alpha_s^2` values for a given :math:`\sigma^2` using Non-Negative Least Squares + r"""Compute :math:`\alpha^2` values for a given :math:`\sigma^2` using Non-Negative Least Squares (NNLS). - This function calculates the :math:`\alpha_s^2` values required for determining the - regularization parameters (:math:`\lambda`) in Ridge regression. The :math:`\alpha_s^2` values + This function calculates the :math:`\alpha^2` values required for determining the + regularization parameters (:math:`\alpha`) in Ridge regression. The :math:`\alpha^2` values encapsulate the balance between the coefficient norms and the influence of the design matrix, adjusted by :math:`\sigma^2`. @@ -1055,46 +1007,46 @@ def get_alpha_s_squared(mom: MomentTunerSetup, sigma_sq: float) -> np.ndarray: :math:`\text{RHS}_g = \|\beta_g\|^2 - \sigma^2 \|N_g\|^2` 2. Solve the NNLS problem: - :math:`\min \| M_{\text{squared}} \times \alpha_{\text{sq\_by\_p}} - \text{RHS} \|_2^2` - subject to :math:`\alpha_{\text{sq\_by\_p}} \geq 0` + :math:`\min \| M \times \alpha_{\text{per\_group}} - \text{RHS} \|_2^2` + subject to :math:`\alpha_{\text{per\_group}} \geq 0` - 3. Compute :math:`\alpha_g^2`: - :math:`\alpha_g^2 = \alpha_{\text{sq\_by\_p}} \times p_g` + 3. Compute :math:`\alpha^2`: + :math:`\alpha^2 = \alpha_{\text{per\_group}} \times p_g` Parameters ---------- mom : MomentTunerSetup The moment tuner setup containing necessary statistics. sigma_sq : float - The :math:`\sigma^2` value for which to compute :math:`\alpha_s^2`. + The :math:`\sigma^2` value for which to compute :math:`\alpha^2`. Returns ------- - np.ndarray - The computed :math:`\alpha_s^2` values for each feature group. + np.ndarray of shape (n_groups,) + The computed :math:`\alpha^2` values for each feature group. Raises ------ ValueError - If sigma_sq is negative or if any :math:`p_s` is zero. + If sigma_sq is negative or if any n_features_per_group is zero. NNLSError If the NNLS algorithm fails to converge. """ if sigma_sq < 0: raise ValueError("sigma_sq must be non-negative.") - if np.any(mom.ps == 0): - raise ValueError("All p_s values must be non-zero.") + if np.any(mom.n_features_per_group_ == 0): + raise ValueError("All n_features_per_group values must be non-zero.") # Compute the right-hand side - rhs = mom.beta_norms_squared - sigma_sq * mom.N_norms_squared + rhs = mom.coef_norms_squared_ - sigma_sq * mom.gram_inv_norms_squared_ rhs = np.maximum(rhs, 0) - # Solve the NNLS problem: M_squared * alpha_sq_by_p ≈ rhs + # Solve the NNLS problem: moment_matrix * alpha_per_group ≈ rhs try: - alpha_sq_by_p = nonneg_lsq(mom.M_squared, rhs, alg="fnnls") + alpha_per_group = nonneg_lsq(mom.moment_matrix_, rhs, alg="fnnls") except NNLSError as e: - raise NNLSError(f"Failed to compute alpha_s_squared: {str(e)}") + raise NNLSError(f"Failed to compute alpha_squared: {str(e)}") - alpha_s_squared = alpha_sq_by_p * mom.groups.ps + alpha_squared = alpha_per_group * mom.n_features_per_group_ - return alpha_s_squared + return alpha_squared \ No newline at end of file diff --git a/test/test_blockridge.py b/test/test_blockridge.py index 5a7e349..5ce2687 100644 --- a/test/test_blockridge.py +++ b/test/test_blockridge.py @@ -1,3 +1,5 @@ +# TODO: Remove this file soon + import pytest import numpy as np import time @@ -7,7 +9,7 @@ CholeskyRidgePredictor, WoodburyRidgePredictor, ShermanMorrisonRidgePredictor, - BasicGroupRidgeWorkspace, + GroupRidgeRegressor, lambda_lolas_rule, MomentTunerSetup, sigma_squared_path, @@ -64,12 +66,12 @@ def test_cholesky_ridge_predictor_initialization(X): np.testing.assert_allclose(predictor.XtXp_lambda, expected_XtXp_lambda, atol=1e-6) -def test_cholesky_ridge_predictor_update_lambda_s(X, groups): +def test_cholesky_ridge_predictor_set_params(X, groups): """Test updating lambda values in CholeskyRidgePredictor.""" predictor = CholeskyRidgePredictor(X) # Generate lambdas based on the number of groups lambdas = np.linspace(0.5, 1.5, groups.num_groups) - predictor.update_lambda_s(groups, lambdas) + predictor.set_params(groups, lambdas) expected_diag = groups.group_expand(lambdas) expected_XtXp_lambda = predictor.XtX + np.diag(expected_diag) np.testing.assert_allclose(predictor.XtXp_lambda, expected_XtXp_lambda, atol=1e-6) @@ -78,11 +80,11 @@ def test_cholesky_ridge_predictor_update_lambda_s(X, groups): np.testing.assert_allclose(reconstructed, expected_XtXp_lambda, atol=1e-6) -def test_cholesky_ridge_predictor_ldiv(X): - """Test the ldiv operation of CholeskyRidgePredictor.""" +def test_cholesky_ridge_predictor_solve_system(X): + """Test the _solve_system operation of CholeskyRidgePredictor.""" predictor = CholeskyRidgePredictor(X) B = np.random.randn(X.shape[1], 3) - result = predictor.ldiv(B) + result = predictor._solve_system(B) expected = np.linalg.solve(predictor.XtXp_lambda, B) np.testing.assert_allclose(result, expected, atol=1e-6) @@ -96,12 +98,12 @@ def test_woodbury_ridge_predictor_initialization(X): assert predictor.V.shape == X.shape -def test_woodbury_ridge_predictor_update_lambda_s(X, groups): +def test_woodbury_ridge_predictor_set_params(X, groups): """Test updating lambda values in WoodburyRidgePredictor.""" predictor = WoodburyRidgePredictor(X) # Generate lambdas based on the number of groups lambdas = np.linspace(0.5, 1.5, groups.num_groups) - predictor.update_lambda_s(groups, lambdas) + predictor.set_params(groups, lambdas) # Create the expected matrix expected_A = np.diag(groups.group_expand(lambdas)) + X.T @ X @@ -109,120 +111,120 @@ def test_woodbury_ridge_predictor_update_lambda_s(X, groups): np.testing.assert_allclose(predictor.A_inv, expected_A_inv, atol=1e-6) - # Test ldiv operation + # Test _solve_system operation B = np.random.randn(X.shape[1], 3) - result = predictor.ldiv(B) + result = predictor._solve_system(B) expected = np.linalg.solve(expected_A, B) np.testing.assert_allclose(result, expected, atol=1e-6) -def test_basic_group_ridge_workspace_initialization(X, Y, groups): - """Test the initialization of BasicGroupRidgeWorkspace.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) - assert workspace.n == X.shape[0] - assert workspace.p == X.shape[1] - assert workspace.groups == groups - assert workspace.XtY.shape == (X.shape[1],) - assert workspace.lambdas.shape == (groups.num_groups,) - assert workspace.beta_current.shape == (X.shape[1],) - assert workspace.Y_hat.shape == (X.shape[0],) - assert workspace.XtXp_lambda_div_Xt.shape == (X.shape[1], X.shape[0]) +def test_group_ridge_regressor_initialization(X, Y, groups): + """Test the initialization of GroupRidgeRegressor.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) + assert regressor.n == X.shape[0] + assert regressor.p == X.shape[1] + assert regressor.groups == groups + assert regressor.XtY.shape == (X.shape[1],) + assert regressor.lambdas.shape == (groups.num_groups,) + assert regressor.beta_current.shape == (X.shape[1],) + assert regressor.Y_hat.shape == (X.shape[0],) + assert regressor.XtXp_lambda_div_Xt.shape == (X.shape[1], X.shape[0]) -def test_basic_group_ridge_workspace_fit(X, Y, groups): - """Test the fit method of BasicGroupRidgeWorkspace.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) +def test_group_ridge_regressor_fit(X, Y, groups): + """Test the fit method of GroupRidgeRegressor.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) # Generate lambdas based on the number of groups lambdas = np.linspace(1.0, 1.0, groups.num_groups) - workspace.fit(lambdas) - assert workspace.beta_current.shape == (groups.p,) - expected_Y_hat = X @ workspace.beta_current - np.testing.assert_allclose(workspace.Y_hat, expected_Y_hat, atol=1e-6) + regressor.fit(lambdas) + assert regressor.beta_current.shape == (groups.p,) + expected_Y_hat = X @ regressor.beta_current + np.testing.assert_allclose(regressor.Y_hat, expected_Y_hat, atol=1e-6) -def test_basic_group_ridge_workspace_update_lambda_s(X, Y, groups): - """Test updating lambda values in BasicGroupRidgeWorkspace.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) +def test_group_ridge_regressor_set_params(X, Y, groups): + """Test updating lambda values in GroupRidgeRegressor.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) new_lambdas = np.random.rand(groups.num_groups) - workspace.update_lambda_s(new_lambdas) - np.testing.assert_allclose(workspace.lambdas, new_lambdas) + regressor.set_params(new_lambdas) + np.testing.assert_allclose(regressor.lambdas, new_lambdas) -def test_basic_group_ridge_workspace_ngroups(X, Y, groups): - """Test the ngroups method of BasicGroupRidgeWorkspace.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) - assert workspace.ngroups() == groups.num_groups +def test_group_ridge_regressor_get_n_groups(X, Y, groups): + """Test the get_n_groups method of GroupRidgeRegressor.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) + assert regressor.get_n_groups() == groups.num_groups -def test_basic_group_ridge_workspace_coef(X, Y, groups): - """Test the coef method of BasicGroupRidgeWorkspace.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) - assert np.allclose(workspace.coef(), workspace.beta_current) +def test_group_ridge_regressor_get_coef(X, Y, groups): + """Test the get_coef method of GroupRidgeRegressor.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) + assert np.allclose(regressor.get_coef(), regressor.beta_current) -def test_basic_group_ridge_workspace_islinear(X, Y, groups): - """Test the islinear method of BasicGroupRidgeWorkspace.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) - assert workspace.islinear() == True +def test_group_ridge_regressor_is_linear(X, Y, groups): + """Test the is_linear method of GroupRidgeRegressor.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) + assert regressor.is_linear() == True -def test_basic_group_ridge_workspace_leverage(X, Y, groups): - """Test the leverage method of BasicGroupRidgeWorkspace.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) - workspace.fit(np.ones(groups.num_groups)) - leverage = workspace.leverage() +def test_group_ridge_regressor_get_leverage(X, Y, groups): + """Test the get_leverage method of GroupRidgeRegressor.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) + regressor.fit(np.ones(groups.num_groups)) + leverage = regressor.get_leverage() assert leverage.shape == (X.shape[0],) assert np.all(leverage >= 0) and np.all(leverage <= 1) -def test_basic_group_ridge_workspace_modelmatrix(X, Y, groups): - """Test the modelmatrix method of BasicGroupRidgeWorkspace.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) - np.testing.assert_allclose(workspace.modelmatrix(), X) +def test_group_ridge_regressor_get_model_matrix(X, Y, groups): + """Test the get_model_matrix method of GroupRidgeRegressor.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) + np.testing.assert_allclose(regressor.get_model_matrix(), X) -def test_basic_group_ridge_workspace_response(X, Y, groups): - """Test the response method of BasicGroupRidgeWorkspace.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) - np.testing.assert_allclose(workspace.response(), Y) +def test_group_ridge_regressor_get_response(X, Y, groups): + """Test the get_response method of GroupRidgeRegressor.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) + np.testing.assert_allclose(regressor.get_response(), Y) -def test_basic_group_ridge_workspace_fit_with_dict(X, Y, groups): - """Test fitting BasicGroupRidgeWorkspace with a dictionary of lambda values.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) +def test_group_ridge_regressor_fit_with_dict(X, Y, groups): + """Test fitting GroupRidgeRegressor with a dictionary of lambda values.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) lambda_dict = { f"group_{i}": val for i, val in enumerate(np.random.rand(groups.num_groups)) } - loo_error = workspace.fit(lambda_dict) + loo_error = regressor.fit(lambda_dict) assert isinstance(loo_error, float) assert loo_error >= 0 -def test_basic_group_ridge_workspace_predict_new_data(X, Y, groups): - """Test predicting with new data using BasicGroupRidgeWorkspace.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) - workspace.fit(np.ones(groups.num_groups)) +def test_group_ridge_regressor_predict_new_data(X, Y, groups): + """Test predicting with new data using GroupRidgeRegressor.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) + regressor.fit(np.ones(groups.num_groups)) X_new = np.random.randn(10, X.shape[1]) - predictions = workspace.predict(X_new) + predictions = regressor.predict(X_new) assert predictions.shape == (10,) def test_lambda_lolas_rule(X, Y, groups): """Test the lambda LOLAS rule calculation.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) - workspace.fit(np.ones(groups.num_groups)) - mom = MomentTunerSetup(workspace) - rule_lambda = lambda_lolas_rule(workspace, multiplier=0.1) + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) + regressor.fit(np.ones(groups.num_groups)) + mom = MomentTunerSetup(regressor) + rule_lambda = lambda_lolas_rule(regressor, multiplier=0.1) expected_lambda = ( - 0.1 * workspace.p**2 / workspace.n / workspace.predictor.trace_XtX() + 0.1 * regressor.p**2 / regressor.n / regressor.predictor._trace_gram_matrix() ) assert np.isclose(rule_lambda, expected_lambda, atol=1e-6) def test_moment_tuner_setup(X, Y, groups): """Test the initialization of MomentTunerSetup.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) - mom = MomentTunerSetup(workspace) + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) + mom = MomentTunerSetup(regressor) assert mom.ps.tolist() == groups.ps assert mom.n == X.shape[0] assert len(mom.beta_norms_squared) == groups.num_groups @@ -232,12 +234,12 @@ def test_moment_tuner_setup(X, Y, groups): def test_sigma_squared_path(X, Y, groups): """Test the sigma squared path calculation.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) lambdas = np.ones(groups.num_groups) - workspace.fit(lambdas) - mom = MomentTunerSetup(workspace) + regressor.fit(lambdas) + mom = MomentTunerSetup(regressor) sigma_s_squared = np.linspace(0.1, 2.0, 10) - path = sigma_squared_path(workspace, mom, sigma_s_squared) + path = sigma_squared_path(regressor, mom, sigma_s_squared) assert "lambdas" in path assert "loos" in path assert "betas" in path @@ -248,10 +250,10 @@ def test_sigma_squared_path(X, Y, groups): def test_get_lambdas(X, Y, groups): """Test the get_lambdas function.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) lambdas = np.linspace(1.0, 1.0, groups.num_groups) - workspace.fit(lambdas) - mom = MomentTunerSetup(workspace) + regressor.fit(lambdas) + mom = MomentTunerSetup(regressor) sigma_sq = 1.0 lambdas_out = get_lambdas(mom, sigma_sq) @@ -264,10 +266,10 @@ def test_get_lambdas(X, Y, groups): def test_get_alpha_s_squared(X, Y, groups): """Test the get_alpha_s_squared function.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) lambdas = np.linspace(1.0, 1.0, groups.num_groups) - workspace.fit(lambdas) - mom = MomentTunerSetup(workspace) + regressor.fit(lambdas) + mom = MomentTunerSetup(regressor) sigma_sq = 1.0 try: alpha_sq = get_alpha_s_squared(mom, sigma_sq) @@ -280,35 +282,32 @@ def test_get_alpha_s_squared(X, Y, groups): def test_predict(X, Y, groups): - """Test the predict method of BasicGroupRidgeWorkspace.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) + """Test the predict method of GroupRidgeRegressor.""" + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) lambdas = np.linspace(1.0, 1.0, groups.num_groups) - workspace.fit(lambdas) - predictions = workspace.predict(X) - np.testing.assert_allclose(predictions, workspace.Y_hat, atol=1e-6) + regressor.fit(lambdas) + predictions = regressor.predict(X) + np.testing.assert_allclose(predictions, regressor.Y_hat, atol=1e-6) -# ------------------------------ -# Tests for LOO Error -# ------------------------------ -def test_loo_error(X, Y, groups): +def test_get_loo_error(X, Y, groups): """Test the leave-one-out error calculation.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) lambdas = np.linspace(1.0, 1.0, groups.num_groups) - loo_error = workspace.fit(lambdas) + loo_error = regressor.fit(lambdas) assert isinstance(loo_error, float) # Since it's a mean of squared errors, it should be non-negative assert loo_error >= 0 -def test_mse_ridge(X, Y, groups): +def test_get_mse(X, Y, groups): """Test the mean squared error calculation for ridge regression.""" - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) lambdas = np.linspace(1.0, 1.0, groups.num_groups) - workspace.fit(lambdas) + regressor.fit(lambdas) X_test = np.random.randn(50, X.shape[1]) - Y_test = X_test @ workspace.beta_current + np.random.randn(50) - mse = workspace.mse_ridge(X_test, Y_test) + Y_test = X_test @ regressor.beta_current + np.random.randn(50) + mse = regressor.get_mse(X_test, Y_test) assert isinstance(mse, float) assert mse >= 0 @@ -329,46 +328,46 @@ def high_dim_data(): def test_high_dimensional_case(high_dim_data): """Test various components of the ridge regression in high-dimensional settings.""" X, Y, groups = high_dim_data - workspace = BasicGroupRidgeWorkspace(X=X, Y=Y, groups=groups) + regressor = GroupRidgeRegressor(X=X, Y=Y, groups=groups) # Test initialization - assert workspace.n == X.shape[0] - assert workspace.p == X.shape[1] - assert workspace.groups.num_groups == 20 + assert regressor.n == X.shape[0] + assert regressor.p == X.shape[1] + assert regressor.groups.num_groups == 20 # Test fitting lambdas = np.ones(groups.num_groups) - loo_error = workspace.fit(lambdas) + loo_error = regressor.fit(lambdas) assert isinstance(loo_error, float) assert loo_error >= 0 # Test prediction - predictions = workspace.predict(X) + predictions = regressor.predict(X) assert predictions.shape == (X.shape[0],) # Test MSE - mse = workspace.mse_ridge(X, Y) + mse = regressor.get_mse(X, Y) assert isinstance(mse, float) assert mse >= 0 # Test Cholesky predictor chol_predictor = CholeskyRidgePredictor(X) - chol_predictor.update_lambda_s(groups, lambdas) + chol_predictor.set_params(groups, lambdas) assert chol_predictor.XtXp_lambda_chol.shape == (X.shape[1], X.shape[1]) # Test Woodbury predictor wood_predictor = WoodburyRidgePredictor(X) - wood_predictor.update_lambda_s(groups, lambdas) + wood_predictor.set_params(groups, lambdas) assert wood_predictor.A_inv.shape == (X.shape[1], X.shape[1]) # Test MomentTunerSetup - mom = MomentTunerSetup(workspace) + mom = MomentTunerSetup(regressor) assert mom.ps.shape == (groups.num_groups,) assert mom.M_squared.shape == (groups.num_groups, groups.num_groups) # Test sigma_squared_path sigma_s_squared = np.linspace(0.1, 2.0, 5) - path = sigma_squared_path(workspace, mom, sigma_s_squared) + path = sigma_squared_path(regressor, mom, sigma_s_squared) assert path["lambdas"].shape == (5, groups.num_groups) assert path["loos"].shape == (5,) assert path["betas"].shape == (5, X.shape[1]) @@ -385,14 +384,14 @@ def test_high_dimensional_predictor_speed(high_dim_data): # Test Cholesky predictor speed chol_start = time.time() chol_predictor = CholeskyRidgePredictor(X) - chol_predictor.update_lambda_s(groups, lambdas) + chol_predictor.set_params(groups, lambdas) chol_end = time.time() chol_time = chol_end - chol_start # Test Woodbury predictor speed wood_start = time.time() wood_predictor = WoodburyRidgePredictor(X) - wood_predictor.update_lambda_s(groups, lambdas) + wood_predictor.set_params(groups, lambdas) wood_end = time.time() wood_time = wood_end - wood_start @@ -403,12 +402,12 @@ def test_high_dimensional_predictor_speed(high_dim_data): B = np.random.randn(p, 5) chol_op_start = time.time() - chol_result = chol_predictor.ldiv(B) + chol_result = chol_predictor._solve_system(B) chol_op_end = time.time() chol_op_time = chol_op_end - chol_op_start wood_op_start = time.time() - wood_result = wood_predictor.ldiv(B) + wood_result = wood_predictor._solve_system(B) wood_op_end = time.time() wood_op_time = wood_op_end - wood_op_start @@ -417,10 +416,6 @@ def test_high_dimensional_predictor_speed(high_dim_data): np.testing.assert_allclose(chol_result, wood_result, rtol=1e-1, atol=1e-1) - # Remove the assertion for operation time, as it may vary depending on the implementation - print(f"Cholesky operation time: {chol_op_time:.4f} seconds") - print(f"Woodbury operation time: {wood_op_time:.4f} seconds") - @pytest.fixture def very_high_dim_data(): @@ -436,7 +431,7 @@ def very_high_dim_data(): def test_very_high_dimensional_predictor_speed(very_high_dim_data): - """Compare the speed of Cholesky and Woodbury predictors in very high-dimensional settings.""" + """Compare the speed of Cholesky, Woodbury, and Sherman-Morrison predictors in very high-dimensional settings.""" X, Y, groups = very_high_dim_data lambdas = np.ones(groups.num_groups) n, p = X.shape @@ -446,37 +441,52 @@ def test_very_high_dimensional_predictor_speed(very_high_dim_data): # Test Cholesky predictor speed chol_start = time.time() chol_predictor = CholeskyRidgePredictor(X) - chol_predictor.update_lambda_s(groups, lambdas) + chol_predictor.set_params(groups, lambdas) chol_end = time.time() chol_time = chol_end - chol_start # Test Woodbury predictor speed wood_start = time.time() wood_predictor = WoodburyRidgePredictor(X) - wood_predictor.update_lambda_s(groups, lambdas) + wood_predictor.set_params(groups, lambdas) wood_end = time.time() wood_time = wood_end - wood_start + # Test Sherman-Morrison predictor speed + sm_start = time.time() + sm_predictor = ShermanMorrisonRidgePredictor(X) + sm_predictor.set_params(groups, lambdas) + sm_end = time.time() + sm_time = sm_end - sm_start + print(f"Cholesky predictor time: {chol_time:.4f} seconds") print(f"Woodbury predictor time: {wood_time:.4f} seconds") + print(f"Sherman-Morrison predictor time: {sm_time:.4f} seconds") # Test matrix operations B = np.random.randn(p, 5) chol_op_start = time.time() - chol_result = chol_predictor.ldiv(B) + chol_result = chol_predictor._solve_system(B) chol_op_end = time.time() chol_op_time = chol_op_end - chol_op_start wood_op_start = time.time() - wood_result = wood_predictor.ldiv(B) + wood_result = wood_predictor._solve_system(B) wood_op_end = time.time() wood_op_time = wood_op_end - wood_op_start + sm_op_start = time.time() + sm_result = sm_predictor._solve_system(B) + sm_op_end = time.time() + sm_op_time = sm_op_end - sm_op_start + print(f"Cholesky operation time: {chol_op_time:.4f} seconds") print(f"Woodbury operation time: {wood_op_time:.4f} seconds") + print(f"Sherman-Morrison operation time: {sm_op_time:.4f} seconds") - np.testing.assert_allclose(chol_result, wood_result, rtol=1e-1, atol=1e-1) + np.testing.assert_allclose(chol_result, sm_result, rtol=1e-1, atol=1e-1) + np.testing.assert_allclose(wood_result, sm_result, rtol=1e-1, atol=1e-1) assert wood_time < chol_time, ( "Woodbury predictor should be faster in very high-dimensional case. Woodbury:" @@ -494,11 +504,11 @@ def test_sherman_morrison_ridge_predictor_initialization(X): np.testing.assert_allclose(predictor.A_inv, expected_A_inv, atol=1e-6) -def test_sherman_morrison_ridge_predictor_update_lambda_s(X, groups): - """Test the update_lambda_s method of ShermanMorrisonRidgePredictor.""" +def test_sherman_morrison_ridge_predictor_set_params(X, groups): + """Test the set_params method of ShermanMorrisonRidgePredictor.""" predictor = ShermanMorrisonRidgePredictor(X) lambdas = np.linspace(0.5, 1.5, groups.num_groups) - predictor.update_lambda_s(groups, lambdas) + predictor.set_params(groups, lambdas) # Create the expected matrix expected_A = np.diag(groups.group_expand(lambdas)) + X.T @ X / X.shape[0] @@ -522,17 +532,17 @@ def test_sherman_morrison_single_update(X): ) # Apply Sherman-Morrison update - predictor.sherman_morrison(u, v) + predictor._sherman_morrison_update(u, v) # Assert that predictor.A_inv matches the manually computed inverse np.testing.assert_allclose(predictor.A_inv, expected_A_inv, atol=1e-6) -def test_sherman_morrison_ridge_predictor_ldiv(X): - """Test the ldiv method of ShermanMorrisonRidgePredictor.""" +def test_sherman_morrison_ridge_predictor_solve_system(X): + """Test the _solve_system method of ShermanMorrisonRidgePredictor.""" predictor = ShermanMorrisonRidgePredictor(X) B = np.random.randn(X.shape[1], 3) - result = predictor.ldiv(B) + result = predictor._solve_system(B) expected = predictor.A_inv @ B np.testing.assert_allclose(result, expected, atol=1e-6) @@ -553,7 +563,7 @@ def test_sherman_morrison_formula(X): denominator = 1.0 + v @ (A_inv @ u) expected_A_inv = A_inv - np.outer(A_inv @ u, v @ A_inv) / denominator - predictor.sherman_morrison(u, v) + predictor._sherman_morrison_update(u, v) np.testing.assert_allclose(predictor.A_inv, expected_A_inv, atol=1e-6) @@ -567,11 +577,11 @@ def test_sherman_morrison_predictor_in_high_dimensional_case(high_dim_data): # Test updating lambda_s lambdas = np.ones(groups.num_groups) - predictor.update_lambda_s(groups, lambdas) + predictor.set_params(groups, lambdas) - # Test ldiv operation + # Test _solve_system operation B = np.random.randn(X.shape[1], 5) - result = predictor.ldiv(B) + result = predictor._solve_system(B) assert result.shape == B.shape @@ -586,14 +596,14 @@ def test_very_high_dimensional_predictor_speed(very_high_dim_data): # Test Cholesky predictor speed chol_start = time.time() chol_predictor = CholeskyRidgePredictor(X) - chol_predictor.update_lambda_s(groups, lambdas) + chol_predictor.set_params(groups, lambdas) chol_end = time.time() chol_time = chol_end - chol_start # Test Woodbury predictor speed wood_start = time.time() wood_predictor = WoodburyRidgePredictor(X) - wood_predictor.update_lambda_s(groups, lambdas) + wood_predictor.set_params(groups, lambdas) wood_end = time.time() wood_time = wood_end - wood_start @@ -603,7 +613,7 @@ def test_very_high_dimensional_predictor_speed(very_high_dim_data): # Test Sherman-Morrison predictor speed sm_start = time.time() sm_predictor = ShermanMorrisonRidgePredictor(X) - sm_predictor.update_lambda_s(groups, lambdas) + sm_predictor.set_params(groups, lambdas) sm_end = time.time() sm_time = sm_end - sm_start @@ -613,17 +623,17 @@ def test_very_high_dimensional_predictor_speed(very_high_dim_data): B = np.random.randn(p, 5) chol_op_start = time.time() - chol_result = chol_predictor.ldiv(B) + chol_result = chol_predictor._trace_gram_matrix() chol_op_end = time.time() chol_op_time = chol_op_end - chol_op_start wood_op_start = time.time() - wood_result = wood_predictor.ldiv(B) + wood_result = wood_predictor._trace_gram_matrix() wood_op_end = time.time() wood_op_time = wood_op_end - wood_op_start sm_op_start = time.time() - sm_result = sm_predictor.ldiv(B) + sm_result = sm_predictor._trace_gram_matrix() sm_op_end = time.time() sm_op_time = sm_op_end - sm_op_start diff --git a/test/test_blockridge_scikit.py b/test/test_blockridge_scikit.py new file mode 100644 index 0000000..518b360 --- /dev/null +++ b/test/test_blockridge_scikit.py @@ -0,0 +1,344 @@ +#TODO: Fails 2 tests. Need to fix. + +"""Tests for scikit-learn compatible Ridge regression estimators.""" + +import pytest +import numpy as np +from numpy.testing import assert_array_almost_equal, assert_array_equal +from sklearn.utils.estimator_checks import check_estimator +from sklearn.exceptions import NotFittedError +from sklearn.datasets import make_regression +from PyGRidge.src.blockridge import ( + GroupRidgeRegressor, + CholeskyRidgePredictor, + WoodburyRidgePredictor, + ShermanMorrisonRidgePredictor, + MomentTunerSetup, + lambda_lolas_rule, + get_lambdas, + get_alpha_s_squared, + InvalidDimensionsError, + SingularMatrixError, + NumericalInstabilityError, +) +from PyGRidge.src.groupedfeatures import GroupedFeatures + + +@pytest.fixture +def sample_data(): + """Generate sample regression data.""" + X, y = make_regression( + n_samples=100, + n_features=20, + n_informative=10, + random_state=42 + ) + return X, y + + +@pytest.fixture +def sample_groups(): + """Create sample feature groups.""" + return GroupedFeatures([5, 5, 5, 5]) # 4 groups of 5 features each + + +@pytest.fixture +def fitted_model(sample_data, sample_groups): + """Create a fitted GroupRidgeRegressor.""" + X, y = sample_data + model = GroupRidgeRegressor(groups=sample_groups) + return model.fit(X, y) + + +def test_scikit_learn_compatibility(): + """Test scikit-learn estimator compatibility.""" + # Create a simple estimator with proper array alpha + groups = GroupedFeatures([3]) # Match number of features used by scikit-learn + alpha = np.array([1.0], dtype=float) # Ensure float array + model = GroupRidgeRegressor(groups=groups, alpha=alpha) + + # Generate simple data for initial fit + X = np.random.randn(10, 3) # Match number of features with groups + y = np.random.randn(10) + model.fit(X, y) # Pre-fit to avoid initialization issues + + check_estimator(model) + + +class TestGroupRidgeRegressor: + """Test GroupRidgeRegressor functionality.""" + + def test_init(self, sample_groups): + """Test initialization.""" + model = GroupRidgeRegressor(groups=sample_groups) + assert model.groups == sample_groups + assert model.alpha is None + + alpha = np.ones(4) + model = GroupRidgeRegressor(groups=sample_groups, alpha=alpha) + assert_array_equal(model.alpha, alpha) + + def test_validate_params(self, sample_groups): + """Test parameter validation.""" + # Valid case + model = GroupRidgeRegressor(groups=sample_groups) + model._validate_params() + + # Invalid cases + with pytest.raises(ValueError): + empty_groups = GroupedFeatures([]) + GroupRidgeRegressor(groups=empty_groups)._validate_params() + + with pytest.raises(ValueError): + zero_groups = GroupedFeatures([0, 1, 2]) + GroupRidgeRegressor(groups=zero_groups)._validate_params() + + def test_fit(self, sample_data, sample_groups): + """Test model fitting.""" + X, y = sample_data + model = GroupRidgeRegressor(groups=sample_groups) + + # Test successful fit + model.fit(X, y) + assert hasattr(model, 'coef_') + assert hasattr(model, 'n_features_in_') + assert model.n_features_in_ == X.shape[1] + assert hasattr(model, 'y_') # Check y_ is stored + assert hasattr(model, 'gram_reg_inv_') # Check gram_reg_inv_ is stored + + # Test predictor selection + X_small = X[:, :10] + model_small = GroupRidgeRegressor(groups=GroupedFeatures([5, 5])) + model_small.fit(X_small, y) + assert isinstance(model_small.predictor_, CholeskyRidgePredictor) + + X_large = np.random.randn(10, 50) + y_large = np.random.randn(10) + model_large = GroupRidgeRegressor(groups=GroupedFeatures([25, 25])) + model_large.fit(X_large, y_large) + assert isinstance(model_large.predictor_, ShermanMorrisonRidgePredictor) + + def test_predict(self, fitted_model, sample_data): + """Test prediction functionality.""" + X, _ = sample_data + + # Test prediction shape + y_pred = fitted_model.predict(X) + assert y_pred.shape == (X.shape[0],) + + # Test prediction before fit + model = GroupRidgeRegressor(groups=sample_groups) + with pytest.raises(AttributeError): + model.predict(X) + + def test_get_set_params(self, sample_groups): + """Test parameter getting and setting.""" + model = GroupRidgeRegressor(groups=sample_groups) + params = model.get_params() + assert 'groups' in params + assert 'alpha' in params + + new_groups = GroupedFeatures([3, 3]) + model.set_params(groups=new_groups) + assert model.groups == new_groups + + def test_error_metrics(self, fitted_model, sample_data): + """Test error metric calculations.""" + X, y = sample_data + X_test = X[80:, :] + y_test = y[80:] + + mse = fitted_model.get_mse(X_test, y_test) + assert isinstance(mse, float) + assert mse >= 0 + + loo_error = fitted_model.get_loo_error() + assert isinstance(loo_error, float) + assert loo_error >= 0 + + +class TestRidgePredictors: + """Test individual Ridge predictor implementations.""" + + @pytest.fixture + def sample_matrices(self): + """Generate sample matrices for testing.""" + X = np.random.randn(10, 5) + groups = GroupedFeatures([2, 3]) + alpha = np.array([0.1, 0.2]) + return X, groups, alpha + + def test_cholesky_predictor(self, sample_matrices): + """Test CholeskyRidgePredictor.""" + X, groups, alpha = sample_matrices + predictor = CholeskyRidgePredictor(X) + + # Test initialization + assert predictor.n_samples_ == X.shape[0] + assert predictor.n_features_ == X.shape[1] + assert predictor.lower_ is True + + # Test parameter updates + predictor.set_params(groups, alpha) + assert hasattr(predictor, 'gram_reg_chol_') + + # Test error cases + with pytest.raises(InvalidDimensionsError): + CholeskyRidgePredictor(X.flatten()) + + # Create a truly singular matrix + X_singular = np.zeros((10, 5)) + X_singular[:, 0] = 1 # Make first column all ones + X_singular[:, 1] = 1 # Make second column identical to first + predictor_singular = CholeskyRidgePredictor(X_singular) + with pytest.raises((SingularMatrixError, np.linalg.LinAlgError)): + predictor_singular.set_params(groups, np.zeros_like(alpha)) + + def test_woodbury_predictor(self, sample_matrices): + """Test WoodburyRidgePredictor.""" + X, groups, alpha = sample_matrices + predictor = WoodburyRidgePredictor(X) + + # Test initialization + assert predictor.n_samples_ == X.shape[0] + assert predictor.n_features_ == X.shape[1] + assert hasattr(predictor, 'alpha_inv_') + + # Test parameter updates + predictor.set_params(groups, alpha) + + # Test error cases + with pytest.raises(ValueError): + predictor.set_params(groups, np.array([-1.0, 0.1])) + + def test_sherman_morrison_predictor(self, sample_matrices): + """Test ShermanMorrisonRidgePredictor.""" + X, groups, alpha = sample_matrices + predictor = ShermanMorrisonRidgePredictor(X) + + # Test initialization + assert predictor.n_samples_ == X.shape[0] + assert predictor.n_features_ == X.shape[1] + assert hasattr(predictor, 'A_inv_') + + # Test parameter updates + predictor.set_params(groups, alpha) + + # Test Sherman-Morrison formula + u = np.random.randn(5) + v = np.random.randn(5) + result = predictor._sherman_morrison_formula( + np.eye(5), u, v + ) + assert result.shape == (5, 5) + + +class TestMomentTuner: + """Test moment-based tuning functionality.""" + + def test_moment_tuner_setup(self, fitted_model): + """Test MomentTunerSetup initialization and attributes.""" + tuner = MomentTunerSetup(fitted_model) + + assert hasattr(tuner, 'groups_') + assert hasattr(tuner, 'n_features_per_group_') + assert hasattr(tuner, 'moment_matrix_') + + # Test shape consistency + n_groups = len(tuner.n_features_per_group_) + assert tuner.moment_matrix_.shape == (n_groups, n_groups) + assert tuner.coef_norms_squared_.shape == (n_groups,) + + def test_get_lambdas(self, fitted_model): + """Test lambda parameter computation.""" + tuner = MomentTunerSetup(fitted_model) + sigma_sq = 0.1 + + lambdas = get_lambdas(tuner, sigma_sq) + assert len(lambdas) == len(tuner.n_features_per_group_) + assert np.all(lambdas >= 0) + + # Test error cases + with pytest.raises(ValueError): + get_lambdas(tuner, -1.0) + + def test_get_alpha_squared(self, fitted_model): + """Test alpha squared computation.""" + tuner = MomentTunerSetup(fitted_model) + sigma_sq = 0.1 + + alpha_squared = get_alpha_s_squared(tuner, sigma_sq) + assert len(alpha_squared) == len(tuner.n_features_per_group_) + assert np.all(alpha_squared >= 0) + + # Test error cases + with pytest.raises(ValueError): + get_alpha_s_squared(tuner, -1.0) + + +def test_lambda_lolas_rule(fitted_model): + """Test Lolas rule for lambda selection.""" + lambda_val = lambda_lolas_rule(fitted_model) + assert isinstance(lambda_val, float) + assert lambda_val > 0 + + # Test error cases + with pytest.raises(ValueError): + lambda_lolas_rule(fitted_model, multiplier=-1.0) + + +def test_edge_cases(sample_groups): + """Test various edge cases and error conditions.""" + + # Test with invalid input dimensions + X_1d = np.random.randn(10) + y_1d = np.random.randn(10) + model = GroupRidgeRegressor(groups=sample_groups) + + with pytest.raises(ValueError): + model.fit(X_1d, y_1d) + + # Test with NaN/inf values + X_nan = np.random.randn(10, 20) + X_nan[0, 0] = np.nan + y_nan = np.random.randn(10) + + with pytest.raises(ValueError): + model.fit(X_nan, y_nan) + + # Test with mismatched dimensions + X = np.random.randn(10, 20) + y_mismatched = np.random.randn(15) + + with pytest.raises(ValueError): + model.fit(X, y_mismatched) + + # Test with singular matrix + # Create a matrix with perfect collinearity + X_singular = np.random.randn(10, 20) + X_singular[:, 1] = 2 * X_singular[:, 0] # Make second column linearly dependent + y = np.random.randn(10) + + # Use small alpha to ensure numerical issues + model_singular = GroupRidgeRegressor(groups=sample_groups) + with pytest.raises((SingularMatrixError, np.linalg.LinAlgError)): + model_singular.fit(X_singular, y) + + +def test_numerical_stability(): + """Test numerical stability in extreme cases.""" + # Test with very large values + X_large = np.random.randn(10, 5) * 1e10 + y_large = np.random.randn(10) * 1e10 + groups = GroupedFeatures([2, 3]) + model = GroupRidgeRegressor(groups=groups) + + # This should still work despite large values + model.fit(X_large, y_large) + + # Test with very small values + X_small = np.random.randn(10, 5) * 1e-10 + y_small = np.random.randn(10) * 1e-10 + + # This should still work despite small values + model.fit(X_small, y_small)