From cf04d21cc68f4a991264a2d2f847cff54a1b3991 Mon Sep 17 00:00:00 2001 From: RichieHakim Date: Fri, 29 Mar 2024 21:06:49 -0400 Subject: [PATCH] New PCA implementation --- bnpm/decomposition.py | 311 +++++++++++++++++++++++++++++++++++++++++ bnpm/tests/test_PCA.py | 164 ++++++++++++++++++++++ 2 files changed, 475 insertions(+) create mode 100644 bnpm/tests/test_PCA.py diff --git a/bnpm/decomposition.py b/bnpm/decomposition.py index d0b6672..2eef40f 100644 --- a/bnpm/decomposition.py +++ b/bnpm/decomposition.py @@ -1,3 +1,6 @@ +from typing import Tuple, Optional +import math + import sklearn.decomposition import numpy as np import scipy.interpolate @@ -100,6 +103,314 @@ def simple_pca(X , n_components=None , mean_sub=True, zscore=False, plot_pref=Fa return components , scores , decomp.explained_variance_ratio_ +def svd_flip( + u: torch.Tensor, + v: torch.Tensor +) -> Tuple[torch.Tensor, torch.Tensor]: + """ + Sign correction to ensure deterministic output from SVD. + + The output from SVD does not have a unique sign. This function corrects the + sign of the output to ensure deterministic output from the SVD function. + + RH 2024 + + Args: + u (torch.Tensor): + The left singular vectors. + v (torch.Tensor): + The right singular vectors. + + Returns: + (Tuple[torch.Tensor, torch.Tensor]): + u (torch.Tensor): + The corrected left singular vectors. + v (torch.Tensor): + The corrected right singular vectors. + """ + as_tensor = lambda x: torch.as_tensor(x) if isinstance(x, np.ndarray) else x + u, v = (as_tensor(var) for var in (u, v)) + + max_abs_cols = torch.argmax(torch.abs(u), dim=0) + signs = torch.sign(u[max_abs_cols, range(u.shape[1])]) + u *= signs + v *= signs.unsqueeze(-1) + return u, v + + +class PCA(torch.nn.Module, sklearn.base.BaseEstimator, sklearn.base.TransformerMixin): + """ + Principal Component Analysis (PCA) module. + + This module performs PCA on the input data and returns the principal + components and the explained variance. The PCA is performed using the + singular value decomposition (SVD) method. This class follows sklearn's PCA + implementation and style and is subclassed from torch.nn.Module, + sklearn.base.BaseEstimator, and sklearn.base.TransformerMixin. The + decomposed variables (components, explained_variance, etc.) are stored as + buffers so that they are stored in the state_dict, respond to .to() and + .cuda(), and are saved when the model is saved. + + RH 2024 + + Args: + n_components (Optional[int]): + Number of principal components to retain. If ``None``, all + components are retained. + center (bool): + If ``True``, the data is mean-subtracted before performing SVD. + zscale (bool): + If ``True``, the data is z-scored before performing SVD. Equivalent + of doing eigenvalue decomposition on the correlation matrix. + whiten (bool): + If ``True``, the principal components are divided by the square root + of the explained variance. + use_lowRank (bool): + If ``True``, the low-rank SVD is used. This is faster but less + accurate. Uses torch.svd_lowrank instead of torch.linalg.svd. + lowRank_niter (int): + Number of subspace iterations for low-rank SVD. See + torch.svd_lowrank for more details. + + Attributes: + n_components (int): + Number of principal components to retain. + whiten (bool): + If ``True``, the principal components are divided by the square root + of the explained variance. + device (str): + The device where the tensors will be stored. + dtype (torch.dtype): + The data type to use for the tensor. + components (torch.Tensor): + The principal components. + explained_variance (torch.Tensor): + The explained variance. + explained_variance_ratio (torch.Tensor): + The explained variance ratio. + + Example: + .. highlight:: python + .. code-block:: python + + X = torch.randn(100, 10) + pca = PCA(n_components=5) + pca.fit(X) + X_pca = pca.transform(X) + """ + def __init__( + self, + n_components: Optional[int] = None, + center: bool = True, + zscale: bool = False, + whiten: bool = False, + use_lowRank: bool = False, + lowRank_niter: int = 2, + ): + """ + Initializes the PCA module with the provided parameters. + """ + super(PCA, self).__init__() + self.n_components = n_components + self.center = center + self.zscale = zscale + self.whiten = whiten + self.use_lowRank = use_lowRank + self.lowRank_niter = lowRank_niter + + def prepare_input( + self, + X: torch.Tensor, + center: bool, + zscale: bool + ) -> torch.Tensor: + """ + Prepares the input data for PCA. + + Args: + X (torch.Tensor): + The input data to prepare. + center (bool): + If ``True``, the data is mean-subtracted. + zscale (bool): + If ``True``, the data is z-scored. + + Returns: + (torch.Tensor): + The prepared input data. + """ + if isinstance(X, np.ndarray): + X = torch.as_tensor(X) + assert isinstance(X, torch.Tensor), 'Input must be a torch.Tensor.' + X = X[:, None] if X.ndim == 1 else X + assert X.ndim == 2, 'Input must be 2D.' + + if center: + mean_ = torch.mean(X, dim=0) + X = X - mean_ + self.register_buffer('mean_', mean_) + if zscale: + std_ = torch.std(X, dim=0) + X = X / std_ + self.register_buffer('std_', std_) + return X + + def fit( + self, + X: torch.Tensor, + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """ + Fits the PCA module to the input data. + + Args: + X (torch.Tensor): + The input data to fit the PCA module to. Should be shape + (n_samples, n_features). + + Returns: + self (PCA object): + Returns the PCA object. + """ + self._fit(X) + return self + + def _fit( + self, + X: torch.Tensor + ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """ + Fits the PCA module to the input data. + + Args: + X (torch.Tensor): + The input data to fit the PCA module to. Should be shape + (n_samples, n_features). + + Returns: + (Tuple[torch.Tensor, torch.Tensor, torch.Tensor]): + U (torch.Tensor): + The left singular vectors. Shape (n_samples, n_components). + S (torch.Tensor): + The singular values. Shape (n_components,). + V (torch.Tensor): + The right singular vectors. Shape (n_features, + n_components). + """ + self.n_samples_, self.n_features_ = X.shape + self.n_components_ = min(self.n_components, self.n_features_) if self.n_components is not None else self.n_features_ + + X = self.prepare_input(X, center=self.center, zscale=self.zscale) + if self.use_lowRank: + U, S, Vh = torch.svd_lowrank(X, q=self.n_components_, niter=self.lowRank_niter) + Vh = Vh.T ## torch.svd_lowrank returns Vh transposed. + else: + U, S, Vh = torch.linalg.svd(X, full_matrices=False) ## U: (n_samples, n_features), S: (n_features,), Vh: (n_features, n_features). Vh is already transposed. + U, Vh = svd_flip(U, Vh) + + explained_variance_ = S**2 / (self.n_samples_ - 1) + explained_variance_ratio_ = explained_variance_ / torch.sum(explained_variance_) + + components_ = Vh[:self.n_components_] + singular_values_ = S[:self.n_components_] + explained_variance_ = explained_variance_[:self.n_components_] + explained_variance_ratio_ = explained_variance_ratio_[:self.n_components_] + + [self.register_buffer(name, value) for name, value in zip( + ['components_', 'singular_values_', 'explained_variance_', 'explained_variance_ratio_'], + [components_, singular_values_, explained_variance_, explained_variance_ratio_] + )] + + return U, S, Vh + + def transform( + self, + X: torch.Tensor, + y: Optional[torch.Tensor] = None, + ) -> torch.Tensor: + """ + Transforms the input data using the fitted PCA module. + + Args: + X (torch.Tensor): + The input data to transform. + y (Optional[torch.Tensor]): + Ignored. This parameter exists to match the sklearn API. + + Returns: + (torch.Tensor): + The transformed data. Will be shape (n_samples, n_components). + """ + assert hasattr(self, 'components_'), 'PCA module must be fitted before transforming data.' + X = self.prepare_input(X, center=self.center, zscale=self.zscale) + X_transformed = X @ self.components_.T + if self.whiten: + X_transformed /= torch.sqrt(self.explained_variance_) + return X_transformed + + def fit_transform( + self, + X: torch.Tensor + ) -> torch.Tensor: + """ + Fits the PCA module to the input data and transforms the input data. + + Args: + X (torch.Tensor): + The input data to fit the PCA module to and transform. + + Returns: + (torch.Tensor): + The transformed data. + """ + self.n_samples_, self.n_features_ = X.shape + + U, S, V = self._fit(X) + U = U[:, :self.n_components_] + + if self.whiten: + U *= math.sqrt(self.n_samples_ - 1) + else: + U *= S[:self.n_components_] + + return U + + def inverse_transform( + self, + X: torch.Tensor + ) -> torch.Tensor: + """ + Inverse transforms the input data using the fitted PCA module. + + Args: + X (torch.Tensor): + The input data to inverse transform. Should be shape (n_samples, + n_components). + + Returns: + (torch.Tensor): + The inverse transformed data. Will be shape (n_samples, + n_features). + """ + assert hasattr(self, 'components_'), 'PCA module must be fitted before transforming data.' + X = self.prepare_input(X, center=False, zscale=False) + + if self.whiten: + scaled_components = torch.sqrt(self.explained_variance_) * self.components_ + else: + scaled_components = self.components_ + + X = X @ scaled_components + + if self.zscale: + assert hasattr(self, 'std_'), 'self.zscale is True, but std_ is not found.' + X = X * self.std_ + if self.center: + assert hasattr(self, 'mean_'), 'self.center is True, but mean_ is not found.' + X = X + self.mean_ + + return X + + def torch_pca( X_in, device='cpu', diff --git a/bnpm/tests/test_PCA.py b/bnpm/tests/test_PCA.py new file mode 100644 index 0000000..7f783e8 --- /dev/null +++ b/bnpm/tests/test_PCA.py @@ -0,0 +1,164 @@ +# test_pca.py +import torch +import numpy as np +from sklearn.decomposition import PCA as sklearnPCA +from ..decomposition import PCA # Adjust the import according to your module structure + +# Generate a synthetic dataset +np.random.seed(42) +torch.manual_seed(42) +X_np = np.random.randn(100, 10).astype(np.float32) +X_torch = torch.from_numpy(X_np) + +def test_fit_transform_equivalence(): + n_components = 5 + pca_sklearn = sklearnPCA(n_components=n_components, svd_solver='full').fit(X_np) + pca_torch = PCA(n_components=n_components).fit(X_torch) + + # Transform the data using both PCA implementations + X_transformed_sklearn = pca_sklearn.transform(X_np) + X_transformed_torch = pca_torch.transform(X_torch).numpy() + + # Test for equivalence of the transformed data + assert np.allclose(X_transformed_sklearn, X_transformed_torch, atol=1e-5) + +def test_explained_variance_ratio(): + pca_torch = PCA(n_components=5) + pca_torch.fit(X_torch) + + pca_sklearn = sklearnPCA(n_components=5, svd_solver='full').fit(X_np) + + # Test for equivalence of explained variance ratio + assert np.allclose(pca_torch.explained_variance_ratio_.numpy(), pca_sklearn.explained_variance_ratio_, atol=1e-5) + +def test_inverse_transform(): + pca_torch = PCA(n_components=5) + pca_torch.fit(X_torch) + + X_transformed_torch = pca_torch.transform(X_torch) + X_inversed_torch = pca_torch.inverse_transform(X_transformed_torch).numpy() + + # Test for approximation of original data after inverse transformation + assert np.allclose(X_np, X_inversed_torch, atol=1e-5) + +def test_components_sign(): + pca_torch = PCA(n_components=2) + pca_torch.fit(X_torch) + + # Ensure that the signs of the principal components are corrected + components = pca_torch.components_.numpy() + assert (np.abs(components) == components).any() or (np.abs(components) == -components).any(), "Components' signs are not corrected properly." + +def test_low_rank_svd(): + pca_low_rank = PCA(n_components=3, use_lowRank=True, lowRank_niter=5) + pca_full_rank = PCA(n_components=3, use_lowRank=False) + + pca_low_rank.fit(X_torch) + pca_full_rank.fit(X_torch) + + assert pca_low_rank.components_.shape == pca_full_rank.components_.shape, "Low-rank and full-rank PCA should produce components of the same shape." + + # This test doesn't check for numerical equivalence, as low-rank approximations + # will differ, but it ensures that the method runs and produces output of the expected shape. + +def test_whitening_effect(): + pca_whiten = PCA(n_components=5, whiten=True) + pca_whiten.fit(X_torch) + + X_transformed = pca_whiten.transform(X_torch).numpy() + # Check if the variance across each principal component is close to 1, which is expected after whitening + variances = np.var(X_transformed, axis=0) + assert np.allclose(variances, np.ones(variances.shape), atol=1e-5), "Whitened components do not have unit variance." + +def test_retain_all_components(): + pca_all = PCA(n_components=None) # Retain all components + pca_all.fit(X_torch) + + expected_components = min(X_torch.shape) + assert pca_all.components_.shape[0] == expected_components, "PCA with n_components=None did not retain all components." + +def test_single_component(): + pca_single = PCA(n_components=1) + pca_single.fit(X_torch) + + assert pca_single.components_.shape[0] == 1, "PCA did not correctly reduce data to a single component." + assert pca_single.components_.shape[1] == X_torch.shape[1], "The single component does not have the correct dimensionality." + +def test_more_components_than_features(): + n_features = X_torch.shape[1] + pca_excessive = PCA(n_components=n_features + 5) # Request more components than available features + pca_excessive.fit(X_torch) + + # Should only return a number of components equal to the number of features + assert pca_excessive.components_.shape[0] == n_features, "PCA returned more components than the number of input features." + +def test_data_preparation(): + pca_center_scale = PCA(center=True, zscale=True) + pca_center_scale.fit(X_torch) + + # sklearn doesn't directly expose mean_ and std_ for centered and scaled data, + # so we compare against manually calculated values. + X_centered_scaled = (X_np - X_np.mean(axis=0)) / X_np.std(axis=0) + mean_diff = np.abs(X_centered_scaled.mean(axis=0) - pca_center_scale.mean_.numpy()) + std_diff = np.abs(X_centered_scaled.std(axis=0) - pca_center_scale.std_.numpy()) + + assert np.all(mean_diff < 1e-5), "Data centering (mean subtraction) is incorrect." + assert np.all(std_diff < 1e-5), "Data scaling (division by std) is incorrect." + +def test_singular_values_and_vectors(): + pca_svd = PCA(n_components=5) + pca_svd.fit(X_torch) + + # sklearn's singular values + pca_sklearn = sklearnPCA(n_components=5, svd_solver='full').fit(X_np) + + # Singular values should match + assert np.allclose(pca_svd.singular_values_.numpy(), pca_sklearn.singular_values_, atol=1e-5), "Singular values do not match." + + # There's no direct equivalent for left singular vectors in sklearn's PCA output, + # but ensuring our singular values match is a good indication of correctness. + +def test_singular_values_and_vectors(): + pca_svd = PCA(n_components=5) + pca_svd.fit(X_torch) + + # sklearn's singular values + pca_sklearn = sklearnPCA(n_components=5, svd_solver='full').fit(X_np) + + # Singular values should match + assert np.allclose(pca_svd.singular_values_.numpy(), pca_sklearn.singular_values_, atol=1e-5), "Singular values do not match." + + # There's no direct equivalent for left singular vectors in sklearn's PCA output, + # but ensuring our singular values match is a good indication of correctness. + +def test_low_rank_approximation_accuracy(): + pca_low_rank = PCA(n_components=5, use_lowRank=True, lowRank_niter=10) + pca_low_rank.fit(X_torch) + + pca_full_rank = PCA(n_components=5, use_lowRank=False) + pca_full_rank.fit(X_torch) + + # While we can't expect the low-rank approximation to exactly match the full-rank results, + # we can check that they're reasonably close, implying the approximation is reasonable. + components_diff = np.abs(pca_low_rank.components_.numpy() - pca_full_rank.components_.numpy()) + assert components_diff.mean() < 0.1, "Low-rank approximation deviates too much from full SVD." + +def test_low_rank_approximation_accuracy(): + pca_low_rank = PCA(n_components=5, use_lowRank=True, lowRank_niter=10) + pca_low_rank.fit(X_torch) + + pca_full_rank = PCA(n_components=5, use_lowRank=False) + pca_full_rank.fit(X_torch) + + # While we can't expect the low-rank approximation to exactly match the full-rank results, + # we can check that they're reasonably close, implying the approximation is reasonable. + components_diff = np.abs(pca_low_rank.components_.numpy() - pca_full_rank.components_.numpy()) + assert components_diff.mean() < 0.1, "Low-rank approximation deviates too much from full SVD." + +def test_n_components_effect(): + for n in [2, 5, 8]: + pca_n = PCA(n_components=n) + pca_n.fit(X_torch) + + assert pca_n.components_.shape[0] == n, f"PCA with n_components={n} did not produce the correct number of components." + assert pca_n.explained_variance_.shape[0] == n, f"PCA with n_components={n} did not produce the correct number of explained variances." \ No newline at end of file