Skip to content

Commit

Permalink
Refactor linear regression tests and add hypothesis test
Browse files Browse the repository at this point in the history
  • Loading branch information
RichieHakim committed Apr 14, 2024
1 parent 0a6dde7 commit fec8edc
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 20 deletions.
205 changes: 185 additions & 20 deletions bnpm/linear_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,40 @@ def ridge(X, y, lam=1, add_bias_terms=False):

class LinearRegression_sk(sklearn.base.BaseEstimator, sklearn.base.RegressorMixin):
"""
My own version of a base linear regression estimator using sklearn's format.
Implements a basic linear regression estimator following the scikit-learn estimator interface.
RH 2023
Attributes:
coef_ (Union[np.ndarray, torch.Tensor]):
Coefficients of the linear model.
intercept_ (Union[float, np.ndarray, torch.Tensor]):
Intercept of the linear model.
"""

def get_backend_namespace(self, X, y=None):
def get_backend_namespace(
self,
X: Union[np.ndarray, torch.Tensor],
y: Optional[Union[np.ndarray, torch.Tensor]] = None
) -> Dict[str, Callable]:
"""
Determines the appropriate numerical backend (NumPy or PyTorch) based on
the type of input data.
Args:
X (Union[np.ndarray, torch.Tensor]):
Input features array or tensor.
y (Optional[Union[np.ndarray, torch.Tensor]]):
Optional target array or tensor. (Default is ``None``)
Raises:
NotImplementedError:
If the input type is neither NumPy array nor PyTorch tensor.
Returns:
Dict[str, Callable]:
Dictionary containing backend-specific functions and
constructors.
"""
if isinstance(X, torch.Tensor):
backend = 'torch'
if y is not None:
Expand Down Expand Up @@ -158,14 +187,50 @@ def get_backend_namespace(self, X, y=None):

return ns

def predict(self, X):
def predict(self, X: Union[np.ndarray, torch.Tensor]) -> Union[np.ndarray, torch.Tensor]:
"""
Predicts the target values using the linear model.
Args:
X (Union[np.ndarray, torch.Tensor]):
Input features array or tensor.
Returns:
Union[np.ndarray, torch.Tensor]:
Predicted target values.
"""
if X.ndim==1:
X = X[:, None]

return X @ self.coef_ + self.intercept_

def score(self, X, y, sample_weight=None):
def score(
self,
X: Union[np.ndarray, torch.Tensor],
y: Union[np.ndarray, torch.Tensor],
sample_weight: Optional[np.ndarray] = None
) -> float:
"""
Calculates the coefficient of determination R^2 of the prediction.
Args:
X (Union[np.ndarray, torch.Tensor]):
Input features for which to predict the targets.
y (Union[np.ndarray, torch.Tensor]):
True target values.
sample_weight (Optional[np.ndarray]):
Sample weights. (Default is ``None``)
Raises:
NotImplementedError:
If the input type is neither NumPy array nor PyTorch tensor.
Returns:
float:
R^2 score indicating the prediction accuracy. Uses
sklearn.metrics.r2_score if input is NumPy array, and implements
similar calculation if input is PyTorch tensor.
"""
y_pred = self.predict(X)
if isinstance(X, torch.Tensor):
if sample_weight is not None:
Expand Down Expand Up @@ -211,25 +276,50 @@ def check_and_convert(x):

class Ridge(LinearRegression_sk):
"""
Estimator class for Ridge regression.
Based on the Cholesky's closed form solution.
Implements Ridge regression based on the closed-form / 'analytic solution'
Cholesky decomposition method. \n
NOTE: This class does not check for NaNs, Infs, singular matrices, etc. \n
This method is fast and accurate for small to medium-sized datasets. When
``n_features`` is large and/or underdetermined (``n_samples`` <
``n_features``), the solution will likely diverge from the sklearn solution. \n
RH 2023
Args:
alpha (float):
Regularization parameter. Usually ~1e5
The regularization strength which must be a positive float.
Regularizes the estimate to prevent overfitting by constraining the
size of the coefficients. Usually ~1e-5. (Default is 1)
fit_intercept (bool):
If True, add a column of ones to X.
Theta will not contain the bias term.
X_precompute (ndarray):
X array to precompute X.T @ X + alpha*I
Whether to calculate the intercept for this model. If set to
``False``, no intercept will be used in calculations (i.e., data is
expected to be centered). (Default is ``False``)
X_precompute (Optional[np.ndarray]):
Optionally, the precomputed result of \(X^T X + \alpha I\) can be
passed to speed up calculations. (Default is ``None``)
Attributes:
alpha (float):
Regularization parameter.
fit_intercept (bool):
Specifies if the intercept should be calculated.
X_precompute (bool):
Indicates if \(X^T X + \alpha I\) is precomputed.
iXTXaeXT (Optional[np.ndarray]):
Stores the precomputed inverse of \(X^T X + \alpha I\) times \(X^T\)
if precomputed, otherwise ``None``.
"""
def __init__(
self,
alpha=1,
fit_intercept=False,
fit_intercept=True,
X_precompute=None,
**kwargs,
):
"""
Initializes the Ridge regression model with the specified alpha,
fit_intercept, and optional X_precompute.
"""
self.alpha = alpha
self.fit_intercept = fit_intercept

Expand All @@ -241,6 +331,18 @@ def __init__(
self.iXTXaeXT = None

def prefit(self, X):
"""
Precomputes the \(X^T X + \alpha I\) inverse times \(X^T\) to be used in
the `fit` method.
Args:
X (np.ndarray):
The input features matrix.
Returns:
np.ndarray:
Precomputed inverse of \(X^T X + \alpha I\) times \(X^T\).
"""
ns = self.get_backend_namespace(X=X)
cat, eye, inv, ones, zeros = (ns[key] for key in ['cat', 'eye', 'inv', 'ones', 'zeros'])
if self.fit_intercept:
Expand All @@ -250,6 +352,19 @@ def prefit(self, X):
return inv_XT_X_plus_alpha_eye_XT

def fit(self, X, y):
"""
Fits the Ridge regression model to the data.
Args:
X (np.ndarray):
Training data features.
y (np.ndarray):
Target values.
Returns:
Ridge:
The instance of this Ridge model.
"""
self.n_features_in_ = X.shape[1]

ns = self.get_backend_namespace(X=X, y=y)
Expand All @@ -269,25 +384,46 @@ def fit(self, X, y):

return self


class OLS(LinearRegression_sk):
"""
Estimator class for OLS regression.
Based on the closed form OLS solution.
Implements Ordinary Least Squares (OLS) regression. \n
NOTE: This class does not check for NaNs, Infs, singular matrices, etc. \n
This method is fast and accurate for small to medium-sized datasets. When
``n_features`` is large and/or underdetermined (``n_samples`` <= ``n_features``),
the solution will likely diverge from the sklearn solution. \n
RH 2023
Args:
fit_intercept (bool):
If True, add a column of ones to X.
Theta will not contain the bias term.
X_precompute (ndarray):
X array to precompute inv(X.T @ X) @ X.T
Specifies whether to calculate the intercept for this model. If set
to ``True``, a column of ones is added to the feature matrix \(X\).
(Default is ``True``)
X_precompute (Optional[np.ndarray]):
Optionally, the precomputed result of \(inv(X^T X) @ X^T\) can be
provided to speed up the calculations. (Default is ``None``)
Attributes:
fit_intercept (bool):
Specifies if the intercept should be calculated.
X_precompute (bool):
Indicates if \(inv(X^T X) @ X^T\) is precomputed.
iXTXXT (Optional[np.ndarray]):
Stores the precomputed result of \(inv(X^T X) @ X^T\) if provided,
otherwise ``None``.
"""

def __init__(
self,
fit_intercept=False,
fit_intercept=True,
X_precompute=None,
**kwargs,
):
"""
Initializes the OLS regression model with the specified `fit_intercept`
and an optional `X_precompute`.
"""
self.fit_intercept = fit_intercept

self.X_precompute = True if X_precompute is not None else False
Expand All @@ -298,17 +434,46 @@ def __init__(
self.iXTXXT = None

def prefit(self, X):
"""
Precomputes the \(inv(X^T X) @ X^T\) to be used in the `fit` method.
Args:
X (np.ndarray):
The input features matrix.
Returns:
np.ndarray:
Precomputed result of \(inv(X^T X) @ X^T\).
"""
ns = self.get_backend_namespace(X=X)
cat, eye, inv, ones, zeros = (ns[key] for key in ['cat', 'eye', 'inv', 'ones', 'zeros'])
if self.fit_intercept:
X = cat((X, ones((X.shape[0], 1))), axis=1)

inv_XT_X_XT = inv(X.T @ X) @ X.T
return inv_XT_X_XT

def fit(self, X, y):
"""
Fits the OLS regression model to the data.
Args:
X (np.ndarray):
Training data features.
y (np.ndarray):
Target values.
Returns:
OLS:
The instance of this OLS model.
"""
self.n_features_in_ = X.shape[1]

## Give a UserWarning if n_features >= n_samples
if X.shape[1] >= X.shape[0]:
import warnings
warnings.warn('OLS solution is expected to diverge from sklearn solution when n_features >= n_samples')

ns = self.get_backend_namespace(X=X, y=y)
zeros = ns['zeros']

Expand Down
84 changes: 84 additions & 0 deletions bnpm/tests/test_regression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import pytest
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.datasets import make_regression
import numpy as np
from hypothesis import given, strategies as st
import hypothesis

from ..linear_regression import OLS

# Basic functionality and accuracy
def test_basic_functionality():
X, y = make_regression(n_samples=100, n_features=90, noise=0.1)
model_sklearn = LinearRegression().fit(X, y)
model_ols = OLS().fit(X, y)

np.testing.assert_allclose(model_sklearn.coef_, model_ols.coef_, rtol=1e-5)
np.testing.assert_allclose(model_sklearn.intercept_, model_ols.intercept_, rtol=1e-5)
predictions_sklearn = model_sklearn.predict(X)
predictions_ols = model_ols.predict(X)
assert mean_squared_error(predictions_sklearn, predictions_ols) < 1e-5

# High dimensionality
def test_high_dimensionality():
"""
OLS solution is expected to diverge when n_features >= n_samples. Expect a
warning.
"""
X, y = make_regression(n_samples=100, n_features=100, noise=0.1)
with pytest.warns(UserWarning):
model_ols = OLS().fit(X, y)

# Single feature test
def test_single_feature():
X, y = make_regression(n_samples=100, n_features=1, noise=0.1)
model_sklearn = LinearRegression().fit(X, y)
model_ols = OLS().fit(X, y)

np.testing.assert_allclose(model_sklearn.coef_, model_ols.coef_, rtol=1e-5)
np.testing.assert_allclose(model_sklearn.intercept_, model_ols.intercept_, rtol=1e-5)

# Zero variation test
def test_zero_variation():
X = np.ones((100, 1)) # no variation in X
y = np.random.randn(100)
with pytest.raises(Exception):
model_ols = OLS().fit(X, y)

# Extreme values test
def test_extreme_values():
X = np.array([
[1e10, 1e11, 1e12],
[1e-10, 1e-11, 1e-12],
[1e-10, 1e11, 1e12]
])
y = np.array([1, 2, 3])

model_sklearn = LinearRegression().fit(X, y)
model_ols = OLS().fit(X, y)

np.testing.assert_allclose(model_sklearn.coef_, model_ols.coef_, atol=1e-5)
np.testing.assert_allclose(model_sklearn.intercept_, model_ols.intercept_, atol=1e-5)

# Hypothesis test
@given(
n=st.integers(min_value=2, max_value=100),
m=st.integers(min_value=2, max_value=99),
noise=st.floats(min_value=0.01, max_value=1.0),
)
@hypothesis.settings(max_examples=10)
def test_hypothesis(n, m, noise):
X, y = make_regression(n_samples=n, n_features=m, noise=noise)
model_sklearn = LinearRegression().fit(X, y)
if m >= n:
with pytest.warns(UserWarning):
model_ols = OLS().fit(X, y)
else:
model_ols = OLS().fit(X, y)

np.testing.assert_allclose(model_sklearn.coef_, model_ols.coef_, rtol=1e-5)
np.testing.assert_allclose(model_sklearn.intercept_, model_ols.intercept_, rtol=1e-5)
predictions_sklearn = model_sklearn.predict(X)
predictions_ols = model_ols.predict(X)
assert mean_squared_error(predictions_sklearn, predictions_ols) < 1e-5

0 comments on commit fec8edc

Please sign in to comment.