From fec8edcfbd5966cd4c425b17932b0a47011ca299 Mon Sep 17 00:00:00 2001 From: RichieHakim Date: Sun, 14 Apr 2024 16:19:24 -0400 Subject: [PATCH] Refactor linear regression tests and add hypothesis test --- bnpm/linear_regression.py | 205 ++++++++++++++++++++++++++++++---- bnpm/tests/test_regression.py | 84 ++++++++++++++ 2 files changed, 269 insertions(+), 20 deletions(-) create mode 100644 bnpm/tests/test_regression.py diff --git a/bnpm/linear_regression.py b/bnpm/linear_regression.py index 4e71754..ca6591e 100644 --- a/bnpm/linear_regression.py +++ b/bnpm/linear_regression.py @@ -119,11 +119,40 @@ def ridge(X, y, lam=1, add_bias_terms=False): class LinearRegression_sk(sklearn.base.BaseEstimator, sklearn.base.RegressorMixin): """ - My own version of a base linear regression estimator using sklearn's format. + Implements a basic linear regression estimator following the scikit-learn estimator interface. RH 2023 + + Attributes: + coef_ (Union[np.ndarray, torch.Tensor]): + Coefficients of the linear model. + intercept_ (Union[float, np.ndarray, torch.Tensor]): + Intercept of the linear model. """ - def get_backend_namespace(self, X, y=None): + def get_backend_namespace( + self, + X: Union[np.ndarray, torch.Tensor], + y: Optional[Union[np.ndarray, torch.Tensor]] = None + ) -> Dict[str, Callable]: + """ + Determines the appropriate numerical backend (NumPy or PyTorch) based on + the type of input data. + + Args: + X (Union[np.ndarray, torch.Tensor]): + Input features array or tensor. + y (Optional[Union[np.ndarray, torch.Tensor]]): + Optional target array or tensor. (Default is ``None``) + + Raises: + NotImplementedError: + If the input type is neither NumPy array nor PyTorch tensor. + + Returns: + Dict[str, Callable]: + Dictionary containing backend-specific functions and + constructors. + """ if isinstance(X, torch.Tensor): backend = 'torch' if y is not None: @@ -158,14 +187,50 @@ def get_backend_namespace(self, X, y=None): return ns - def predict(self, X): + def predict(self, X: Union[np.ndarray, torch.Tensor]) -> Union[np.ndarray, torch.Tensor]: + """ + Predicts the target values using the linear model. + Args: + X (Union[np.ndarray, torch.Tensor]): + Input features array or tensor. + + Returns: + Union[np.ndarray, torch.Tensor]: + Predicted target values. + """ if X.ndim==1: X = X[:, None] return X @ self.coef_ + self.intercept_ - def score(self, X, y, sample_weight=None): + def score( + self, + X: Union[np.ndarray, torch.Tensor], + y: Union[np.ndarray, torch.Tensor], + sample_weight: Optional[np.ndarray] = None + ) -> float: + """ + Calculates the coefficient of determination R^2 of the prediction. + + Args: + X (Union[np.ndarray, torch.Tensor]): + Input features for which to predict the targets. + y (Union[np.ndarray, torch.Tensor]): + True target values. + sample_weight (Optional[np.ndarray]): + Sample weights. (Default is ``None``) + + Raises: + NotImplementedError: + If the input type is neither NumPy array nor PyTorch tensor. + + Returns: + float: + R^2 score indicating the prediction accuracy. Uses + sklearn.metrics.r2_score if input is NumPy array, and implements + similar calculation if input is PyTorch tensor. + """ y_pred = self.predict(X) if isinstance(X, torch.Tensor): if sample_weight is not None: @@ -211,25 +276,50 @@ def check_and_convert(x): class Ridge(LinearRegression_sk): """ - Estimator class for Ridge regression. - Based on the Cholesky's closed form solution. + Implements Ridge regression based on the closed-form / 'analytic solution' + Cholesky decomposition method. \n + NOTE: This class does not check for NaNs, Infs, singular matrices, etc. \n + This method is fast and accurate for small to medium-sized datasets. When + ``n_features`` is large and/or underdetermined (``n_samples`` < + ``n_features``), the solution will likely diverge from the sklearn solution. \n + RH 2023 + Args: alpha (float): - Regularization parameter. Usually ~1e5 + The regularization strength which must be a positive float. + Regularizes the estimate to prevent overfitting by constraining the + size of the coefficients. Usually ~1e-5. (Default is 1) fit_intercept (bool): - If True, add a column of ones to X. - Theta will not contain the bias term. - X_precompute (ndarray): - X array to precompute X.T @ X + alpha*I + Whether to calculate the intercept for this model. If set to + ``False``, no intercept will be used in calculations (i.e., data is + expected to be centered). (Default is ``False``) + X_precompute (Optional[np.ndarray]): + Optionally, the precomputed result of \(X^T X + \alpha I\) can be + passed to speed up calculations. (Default is ``None``) + + Attributes: + alpha (float): + Regularization parameter. + fit_intercept (bool): + Specifies if the intercept should be calculated. + X_precompute (bool): + Indicates if \(X^T X + \alpha I\) is precomputed. + iXTXaeXT (Optional[np.ndarray]): + Stores the precomputed inverse of \(X^T X + \alpha I\) times \(X^T\) + if precomputed, otherwise ``None``. """ def __init__( self, alpha=1, - fit_intercept=False, + fit_intercept=True, X_precompute=None, **kwargs, ): + """ + Initializes the Ridge regression model with the specified alpha, + fit_intercept, and optional X_precompute. + """ self.alpha = alpha self.fit_intercept = fit_intercept @@ -241,6 +331,18 @@ def __init__( self.iXTXaeXT = None def prefit(self, X): + """ + Precomputes the \(X^T X + \alpha I\) inverse times \(X^T\) to be used in + the `fit` method. + + Args: + X (np.ndarray): + The input features matrix. + + Returns: + np.ndarray: + Precomputed inverse of \(X^T X + \alpha I\) times \(X^T\). + """ ns = self.get_backend_namespace(X=X) cat, eye, inv, ones, zeros = (ns[key] for key in ['cat', 'eye', 'inv', 'ones', 'zeros']) if self.fit_intercept: @@ -250,6 +352,19 @@ def prefit(self, X): return inv_XT_X_plus_alpha_eye_XT def fit(self, X, y): + """ + Fits the Ridge regression model to the data. + + Args: + X (np.ndarray): + Training data features. + y (np.ndarray): + Target values. + + Returns: + Ridge: + The instance of this Ridge model. + """ self.n_features_in_ = X.shape[1] ns = self.get_backend_namespace(X=X, y=y) @@ -269,25 +384,46 @@ def fit(self, X, y): return self + class OLS(LinearRegression_sk): """ - Estimator class for OLS regression. - Based on the closed form OLS solution. + Implements Ordinary Least Squares (OLS) regression. \n + NOTE: This class does not check for NaNs, Infs, singular matrices, etc. \n + This method is fast and accurate for small to medium-sized datasets. When + ``n_features`` is large and/or underdetermined (``n_samples`` <= ``n_features``), + the solution will likely diverge from the sklearn solution. \n + RH 2023 + Args: fit_intercept (bool): - If True, add a column of ones to X. - Theta will not contain the bias term. - X_precompute (ndarray): - X array to precompute inv(X.T @ X) @ X.T + Specifies whether to calculate the intercept for this model. If set + to ``True``, a column of ones is added to the feature matrix \(X\). + (Default is ``True``) + X_precompute (Optional[np.ndarray]): + Optionally, the precomputed result of \(inv(X^T X) @ X^T\) can be + provided to speed up the calculations. (Default is ``None``) + + Attributes: + fit_intercept (bool): + Specifies if the intercept should be calculated. + X_precompute (bool): + Indicates if \(inv(X^T X) @ X^T\) is precomputed. + iXTXXT (Optional[np.ndarray]): + Stores the precomputed result of \(inv(X^T X) @ X^T\) if provided, + otherwise ``None``. """ def __init__( self, - fit_intercept=False, + fit_intercept=True, X_precompute=None, **kwargs, ): + """ + Initializes the OLS regression model with the specified `fit_intercept` + and an optional `X_precompute`. + """ self.fit_intercept = fit_intercept self.X_precompute = True if X_precompute is not None else False @@ -298,17 +434,46 @@ def __init__( self.iXTXXT = None def prefit(self, X): + """ + Precomputes the \(inv(X^T X) @ X^T\) to be used in the `fit` method. + + Args: + X (np.ndarray): + The input features matrix. + + Returns: + np.ndarray: + Precomputed result of \(inv(X^T X) @ X^T\). + """ ns = self.get_backend_namespace(X=X) cat, eye, inv, ones, zeros = (ns[key] for key in ['cat', 'eye', 'inv', 'ones', 'zeros']) if self.fit_intercept: X = cat((X, ones((X.shape[0], 1))), axis=1) - + inv_XT_X_XT = inv(X.T @ X) @ X.T return inv_XT_X_XT def fit(self, X, y): + """ + Fits the OLS regression model to the data. + + Args: + X (np.ndarray): + Training data features. + y (np.ndarray): + Target values. + + Returns: + OLS: + The instance of this OLS model. + """ self.n_features_in_ = X.shape[1] + ## Give a UserWarning if n_features >= n_samples + if X.shape[1] >= X.shape[0]: + import warnings + warnings.warn('OLS solution is expected to diverge from sklearn solution when n_features >= n_samples') + ns = self.get_backend_namespace(X=X, y=y) zeros = ns['zeros'] diff --git a/bnpm/tests/test_regression.py b/bnpm/tests/test_regression.py new file mode 100644 index 0000000..59ff90a --- /dev/null +++ b/bnpm/tests/test_regression.py @@ -0,0 +1,84 @@ +import pytest +from sklearn.linear_model import LinearRegression +from sklearn.metrics import mean_squared_error +from sklearn.datasets import make_regression +import numpy as np +from hypothesis import given, strategies as st +import hypothesis + +from ..linear_regression import OLS + +# Basic functionality and accuracy +def test_basic_functionality(): + X, y = make_regression(n_samples=100, n_features=90, noise=0.1) + model_sklearn = LinearRegression().fit(X, y) + model_ols = OLS().fit(X, y) + + np.testing.assert_allclose(model_sklearn.coef_, model_ols.coef_, rtol=1e-5) + np.testing.assert_allclose(model_sklearn.intercept_, model_ols.intercept_, rtol=1e-5) + predictions_sklearn = model_sklearn.predict(X) + predictions_ols = model_ols.predict(X) + assert mean_squared_error(predictions_sklearn, predictions_ols) < 1e-5 + +# High dimensionality +def test_high_dimensionality(): + """ + OLS solution is expected to diverge when n_features >= n_samples. Expect a + warning. + """ + X, y = make_regression(n_samples=100, n_features=100, noise=0.1) + with pytest.warns(UserWarning): + model_ols = OLS().fit(X, y) + +# Single feature test +def test_single_feature(): + X, y = make_regression(n_samples=100, n_features=1, noise=0.1) + model_sklearn = LinearRegression().fit(X, y) + model_ols = OLS().fit(X, y) + + np.testing.assert_allclose(model_sklearn.coef_, model_ols.coef_, rtol=1e-5) + np.testing.assert_allclose(model_sklearn.intercept_, model_ols.intercept_, rtol=1e-5) + +# Zero variation test +def test_zero_variation(): + X = np.ones((100, 1)) # no variation in X + y = np.random.randn(100) + with pytest.raises(Exception): + model_ols = OLS().fit(X, y) + +# Extreme values test +def test_extreme_values(): + X = np.array([ + [1e10, 1e11, 1e12], + [1e-10, 1e-11, 1e-12], + [1e-10, 1e11, 1e12] + ]) + y = np.array([1, 2, 3]) + + model_sklearn = LinearRegression().fit(X, y) + model_ols = OLS().fit(X, y) + + np.testing.assert_allclose(model_sklearn.coef_, model_ols.coef_, atol=1e-5) + np.testing.assert_allclose(model_sklearn.intercept_, model_ols.intercept_, atol=1e-5) + +# Hypothesis test +@given( + n=st.integers(min_value=2, max_value=100), + m=st.integers(min_value=2, max_value=99), + noise=st.floats(min_value=0.01, max_value=1.0), +) +@hypothesis.settings(max_examples=10) +def test_hypothesis(n, m, noise): + X, y = make_regression(n_samples=n, n_features=m, noise=noise) + model_sklearn = LinearRegression().fit(X, y) + if m >= n: + with pytest.warns(UserWarning): + model_ols = OLS().fit(X, y) + else: + model_ols = OLS().fit(X, y) + + np.testing.assert_allclose(model_sklearn.coef_, model_ols.coef_, rtol=1e-5) + np.testing.assert_allclose(model_sklearn.intercept_, model_ols.intercept_, rtol=1e-5) + predictions_sklearn = model_sklearn.predict(X) + predictions_ols = model_ols.predict(X) + assert mean_squared_error(predictions_sklearn, predictions_ols) < 1e-5 \ No newline at end of file