From 09483c8b9a35383c334e61a18f0464e075203669 Mon Sep 17 00:00:00 2001 From: Stud Date: Thu, 18 Jul 2024 15:58:31 +0200 Subject: [PATCH 1/3] ProDy PCA code with implemented option to run automized Kernel PCA with grid-based cross-validation --- prody/dynamics/pca.py | 268 +++++++++++++++++++++++++++++++++--------- 1 file changed, 212 insertions(+), 56 deletions(-) diff --git a/prody/dynamics/pca.py b/prody/dynamics/pca.py index 5653c4936..d5b2d6e7b 100644 --- a/prody/dynamics/pca.py +++ b/prody/dynamics/pca.py @@ -3,39 +3,47 @@ essential dynamics analysis (EDA) calculations.""" import time - import numpy as np +import warnings +from scipy.spatial.distance import pdist, squareform +from scipy.linalg import eigh + +from sklearn.model_selection import GridSearchCV +from sklearn.linear_model import LogisticRegression +from sklearn.pipeline import Pipeline +from sklearn.decomposition import KernelPCA +from sklearn.cluster import KMeans +from sklearn.preprocessing import StandardScaler from prody import LOGGER, PY2K from prody.atomic import Atomic from prody.ensemble import Ensemble, PDBEnsemble from prody.trajectory import TrajBase -from prody.utilities import importLA, solveEig, ZERO +from prody.utilities import importLA, ZERO, solveEig from .nma import NMA + if PY2K: range = xrange __all__ = ['PCA', 'EDA'] - class PCA(NMA): + def __init__(self, name): + super().__init__(name) + self._cov = None + self._eigvals = None + self._array = None + self._vars = None + self._n_modes = None - """A class for Principal Component Analysis (PCA) of conformational - ensembles. See examples in :ref:`pca`.""" - - def __init__(self, name='Unknown'): - - NMA.__init__(self, name) def setCovariance(self, covariance, is3d=True): """Set covariance matrix.""" - if not isinstance(covariance, np.ndarray): raise TypeError('covariance must be an ndarray') - elif not (covariance.ndim == 2 and - covariance.shape[0] == covariance.shape[1]): + elif not (covariance.ndim == 2 and covariance.shape[0] == covariance.shape[1]): raise ValueError('covariance must be square matrix') elif covariance.dtype != float: try: @@ -73,7 +81,6 @@ def buildCovariance(self, coordsets, **kwargs): coordinate set (see :meth:`.Frame.superpose`). If frames are already aligned, use ``aligned=True`` argument to skip this step. - .. note:: If *coordsets* is a :class:`.PDBEnsemble` instance, coordinates are treated specially. Let's say **C**\_ij is the element of the @@ -109,7 +116,6 @@ def buildCovariance(self, coordsets, **kwargs): n_atoms = coordsets.numSelected() dof = n_atoms * 3 cov = np.zeros((dof, dof)) - #mean = coordsets._getCoords().flatten() n_confs = 0 n_frames = len(coordsets) if not quiet: @@ -161,7 +167,7 @@ def buildCovariance(self, coordsets, **kwargs): mean = coordsets.mean(0) if not quiet: LOGGER.progress('Building covariance', n_confs, - '_prody_pca') + '_prody_pca') for i, coords in enumerate(coordsets.reshape(s)): deviations = coords - mean cov += np.outer(deviations, deviations) @@ -181,10 +187,10 @@ def buildCovariance(self, coordsets, **kwargs): divide_by = weights.astype(float).repeat(3, axis=2).reshape(s) self._cov = np.dot(d_xyz.T, d_xyz) / np.dot(divide_by.T, divide_by) - if update_coords and ensemble is not None: - if mean is None: - mean = coordsets.mean(0) - ensemble.setCoords(mean) + if update_coords and ensemble is not None: + if mean is None: + mean = coordsets.mean(0) + ensemble.setCoords(mean) self._trace = self._cov.trace() self._dof = dof @@ -192,41 +198,6 @@ def buildCovariance(self, coordsets, **kwargs): if not quiet: LOGGER.report('Covariance matrix calculated in %2fs.', '_prody_pca') - def calcModes(self, n_modes=20, turbo=True, **kwargs): - """Calculate principal (or essential) modes. This method uses - :func:`scipy.linalg.eigh`, or :func:`numpy.linalg.eigh`, function - to diagonalize the covariance matrix. - - :arg n_modes: number of non-zero eigenvalues/vectors to calculate, - default is 20, - if **None** or ``'all'`` is given, all modes will be calculated - :type n_modes: int - - :arg turbo: when available, use a memory intensive but faster way to - calculate modes, default is **True** - :type turbo: bool""" - - if self._cov is None: - raise ValueError('covariance matrix is not built or set') - start = time.time() - self._clear() - if str(n_modes).lower() == 'all': - n_modes = None - - values, vectors, _ = solveEig(self._cov, n_modes=n_modes, zeros=True, - turbo=turbo, reverse=True, **kwargs) - which = values > ZERO - self._eigvals = values[which] - self._array = vectors[:, which] - self._vars = values[which] - self._n_modes = len(self._eigvals) - if self._n_modes > 1: - LOGGER.debug('{0} modes were calculated in {1:.2f}s.' - .format(self._n_modes, time.time()-start)) - else: - LOGGER.debug('{0} mode was calculated in {1:.2f}s.' - .format(self._n_modes, time.time()-start)) - def performSVD(self, coordsets): """Calculate principal modes using singular value decomposition (SVD). *coordsets* argument may be a :class:`.Atomic`, :class:`.Ensemble`, @@ -288,7 +259,7 @@ def addEigenpair(self, eigenvector, eigenvalue=None): If eigen *value* is omitted, it will be set to 1. Eigenvalues are set as variances.""" - NMA.addEigenpair(self, eigenvector, eigenvalue) + super().addEigenpair(self, eigenvector, eigenvalue) self._vars = self._eigvals def setEigens(self, vectors, values=None): @@ -296,8 +267,193 @@ def setEigens(self, vectors, values=None): omitted, they will be set to 1. Eigenvalues are set as variances.""" self._clear() - NMA.setEigens(self, vectors, values) + super().setEigens(self, vectors, values) self._vars = self._eigvals + + + def computeKernelMatrix(self, coordsets, kernel='rbf', degree=3, gamma=None, alpha=1.0, coef0=0.0): + """Compute the kernel matrix for Kernel PCA.""" + + LOGGER.info("Computing Kernel Matrix") + if isinstance(coordsets, np.ndarray): + coordsets = coordsets.reshape((coordsets.shape[0], -1)) + elif isinstance(coordsets, Ensemble): + coordsets = coordsets.getCoordsets().reshape((coordsets.numConfs(), -1)) + else: + raise TypeError('X must be a numpy array or Ensemble') + + if kernel == 'linear': + return np.dot(coordsets, coordsets.T) + elif kernel == 'poly': + return (np.dot(coordsets, coordsets.T) + 1) ** degree + elif kernel == 'rbf': + if gamma is None: + gamma = 1.0 / coordsets.shape[1] + sq_dists = pdist(coordsets, 'sqeuclidean') + mat_sq_dists = squareform(sq_dists) + return np.exp(-gamma * mat_sq_dists) + elif kernel == 'sigmoid': + return np.tanh(alpha * np.dot(coordsets, coordsets.T) + coef0) + else: + raise ValueError(f"Unsupported kernel: {kernel}") + + LOGGER.info("Kernel Matrix computed") + + def calcModes(self, n_modes=20, turbo=True, kernel=None, degree=3, gamma=None, alpha=1.0, coef0=0.0, cv=5, max_iter=500, solver='lbfsg', **kwargs): + """Calculate principal (or essential) modes using Kernel PCA if a kernel is specified.""" + + if kernel: + LOGGER.info("Performing Kernel PCA") + if self._cov is None: + raise ValueError('covariance matrix is not built or set') + start = time.time() + self._clear() + if str(n_modes).lower() == 'all': + n_modes = None + + #Create artifical labels using KMeans + labels = self.createArtificialLabels(self._cov) + + # Optimize kernel PCA parameters + if kernel in ['rbf', 'poly', 'sigmoid']: + best_params = self.optimizeKernelPCAParams(kernel=kernel, n_modes=n_modes, cv=cv, labels=labels, max_iter=max_iter) + degree = best_params.get('kpca__degree', degree) + gamma = best_params.get('kpca__gamma', gamma) + alpha = best_params.get('kpca__alpha', alpha) + coef0 = best_params.get('kpca__coef0', coef0) + + # Compute the kernel matrix + K = self.computeKernelMatrix(self._cov, kernel=kernel, degree=degree, gamma=gamma, alpha=alpha, coef0=coef0) + + # Center the kernel matrix + N = K.shape[0] + one_n = np.ones((N, N)) / N + K_centered = K - one_n @ K - K @ one_n + one_n @ K @ one_n + + # Solve the eigenvalue problem for the centered kernel matrix + values, vectors = eigh(K_centered) + which = values > ZERO + self._eigvals = values[which] + self._array = vectors[:, which] + self._vars = values[which] + self._n_modes = len(self._eigvals) + if self._n_modes > 1: + LOGGER.debug('{0} modes were calculated in {1:.2f}s.' + .format(self._n_modes, time.time()-start)) + else: + LOGGER.debug('{0} mode was calculated in {1:.2f}s.' + .format(self._n_modes, time.time()-start)) + else: + # Fallback to original PCA calculation if no kernel is specified + LOGGER.info("Standard PCA path") + if self._cov is None: + raise ValueError('covariance matrix is not built or set') + start = time.time() + self._clear() + if str(n_modes).lower() == 'all': + n_modes = None + + values, vectors, _ = solveEig(self._cov, n_modes=n_modes, zeros=True, + turbo=turbo, reverse=True, **kwargs) + which = values > ZERO + self._eigvals = values[which] + self._array = vectors[:, which] + self._vars = values[which] + self._n_modes = len(self._eigvals) + if self._n_modes > 1: + LOGGER.debug('{0} modes were calculated in {1:.2f}s.' + .format(self._n_modes, time.time()-start)) + else: + LOGGER.debug('{0} mode was calculated in {1:.2f}s.' + .format(self._n_modes, time.time()-start)) + + + def findBestSolver(self, max_iter=500, cv=5, labels=None): + + log_reg = LogisticRegression(max_iter=max_iter) + + clf = Pipeline([ + ('scaler', StandardScaler()), + ('log_reg', log_reg) + ]) + + param_grid = { + 'log_reg__solver': ['lbfgs', 'liblinear', 'sag', 'saga', 'newton-cg'] + } + + LOGGER.info("Finding the best solver") + + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=UserWarning) + grid_search = GridSearchCV(clf, param_grid, cv=cv) + grid_search.fit(self._cov, labels) + + best_solver = grid_search.best_params_['log_reg__solver'] + LOGGER.info(f"Best solver found: {best_solver}") + + return best_solver + + def optimizeKernelPCAParams(self, kernel='rbf', n_modes=10, cv=5, labels=None, max_iter=500, **kwargs): + + best_solver = self.findBestSolver(max_iter=max_iter, cv=cv, labels=labels) + + if kernel == 'poly': + param_grid = { + 'kpca__degree': np.linspace(1, 6, 6) + } + elif kernel == 'rbf': + param_grid = { + 'kpca__gamma': np.linspace(0.03, 0.06, 20) + } + elif kernel == 'sigmoid': + param_grid = { + 'kpca__alpha': np.linspace(0.02, 1.00, 50) , + 'kpca__coef0': np.linspace(0.00, 10, 50) + } + else: + raise ValueError(f"Unsupported kernel: {kernel}") + + + log_reg = LogisticRegression(max_iter=max_iter, solver=best_solver) + + clf = Pipeline([ + ("scaler", StandardScaler()), + ("kpca", KernelPCA(n_components=n_modes, kernel=kernel)), + ("log_reg", log_reg)]) + + LOGGER.info("Pipeline clf created") + + grid_search = GridSearchCV(clf, param_grid, cv=cv) + grid_search.fit(self._cov, labels) + + # Log mean test score for each solver + cv_results = grid_search.cv_results_ + LOGGER.info("Mean test scores:") + for params, mean_score in zip(cv_results['params'], cv_results['mean_test_score']): + LOGGER.info(f"Solver: {params['log_reg__solver']}, Mean score: {mean_score:.4f}") + + best_params = grid_search.best_params_ + LOGGER.info(f"Best parameters found for kernel {kernel}: {best_params}") + + return best_params + + + def createArtificialLabels(self, coordsets, n_clusters=2): + """Create artificial labels using KMeans clustering.""" + kmeans = KMeans(n_clusters=n_clusters, random_state=42).fit(coordsets) + return kmeans.labels_ + + + def _clear(self): + """Clear attributes.""" + self._eigvals = None + self._array = None + self._vars = None + self._n_modes = None + + def _report(self): + """Report calculation results.""" + pass # Replace with actual reporting functionality class EDA(PCA): From 1f1dfe469d53d119c690209cc16b34f8418fe708 Mon Sep 17 00:00:00 2001 From: Stud Date: Mon, 22 Jul 2024 10:11:00 +0200 Subject: [PATCH 2/3] (Kernel) PCA updated file --- prody/dynamics/pca.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/prody/dynamics/pca.py b/prody/dynamics/pca.py index d5b2d6e7b..b6e2aa4ef 100644 --- a/prody/dynamics/pca.py +++ b/prody/dynamics/pca.py @@ -299,7 +299,7 @@ def computeKernelMatrix(self, coordsets, kernel='rbf', degree=3, gamma=None, alp LOGGER.info("Kernel Matrix computed") - def calcModes(self, n_modes=20, turbo=True, kernel=None, degree=3, gamma=None, alpha=1.0, coef0=0.0, cv=5, max_iter=500, solver='lbfsg', **kwargs): + def calcModes(self, n_modes=20, turbo=True, kernel=None, degree=3, gamma=None, alpha=1.0, coef0=0.0, cv=5, max_iter=500, **kwargs): """Calculate principal (or essential) modes using Kernel PCA if a kernel is specified.""" if kernel: @@ -392,6 +392,7 @@ def findBestSolver(self, max_iter=500, cv=5, labels=None): LOGGER.info(f"Best solver found: {best_solver}") return best_solver + def optimizeKernelPCAParams(self, kernel='rbf', n_modes=10, cv=5, labels=None, max_iter=500, **kwargs): From 518428f5b1d9ac632008c81ef8c187817b62da2f Mon Sep 17 00:00:00 2001 From: Stud Date: Fri, 2 Aug 2024 13:56:10 +0200 Subject: [PATCH 3/3] PCA file including Kernel PCA script and Standard PCA script --- prody/dynamics/pca.py | 543 +++++++++++++++++++++++++++--------------- 1 file changed, 347 insertions(+), 196 deletions(-) diff --git a/prody/dynamics/pca.py b/prody/dynamics/pca.py index b6e2aa4ef..0d2e101a4 100644 --- a/prody/dynamics/pca.py +++ b/prody/dynamics/pca.py @@ -8,14 +8,15 @@ from scipy.spatial.distance import pdist, squareform from scipy.linalg import eigh -from sklearn.model_selection import GridSearchCV +from sklearn.model_selection import GridSearchCV, cross_val_score from sklearn.linear_model import LogisticRegression from sklearn.pipeline import Pipeline from sklearn.decomposition import KernelPCA from sklearn.cluster import KMeans from sklearn.preprocessing import StandardScaler +from sklearn.exceptions import ConvergenceWarning -from prody import LOGGER, PY2K +from prody import LOGGER from prody.atomic import Atomic from prody.ensemble import Ensemble, PDBEnsemble from prody.trajectory import TrajBase @@ -24,26 +25,311 @@ from .nma import NMA -if PY2K: - range = xrange - __all__ = ['PCA', 'EDA'] class PCA(NMA): - def __init__(self, name): + def __init__(self, name='Unknown'): super().__init__(name) - self._cov = None self._eigvals = None self._array = None self._vars = None self._n_modes = None + + NMA.__init__(self, name) + + + def computeKernelMatrix(self, coordsets, kernel='rbf', degree=3, gamma=None, alpha=1.0, coef0=0.0): + """By specifying any Kernel when calling the calcModes function + additionally an adequate matrix has to be calculated to perform + Kernel PCA. This function computes the kernel matrix for Kernel PCA. + For each kernel the appropriate equation with required parameters + is called. + + linear: no additional parameter required + poly: degree (default = 3), recommended 2-6 + rbf: gamma (default = None, integreted calculation using coordsets.shape), recommended 0.3-0.8 + sigmoid: alpha, coef0 (default = 1.0 and 0.0), recommended 0.2-1.0 and 0-10""" + + LOGGER.info("Computing Kernel Matrix") + + #LOGGER.info(f"{coordsets.ndim}") + + if coordsets.ndim == 1: + coordsets = coordsets.reshape(-1, 1) + + #LOGGER.info(f"{coordsets}") + + if kernel == 'linear': + K = np.dot(coordsets, coordsets.T) + #LOGGER.info(f"{K}") + elif kernel == 'poly': + K = (np.dot(coordsets, coordsets.T) + 1) ** degree + #LOGGER.info(f"{K}") + elif kernel == 'rbf': + if gamma is None: + gamma = 1.0 / coordsets.shape[1] + sq_dists = pdist(coordsets, 'sqeuclidean') + mat_sq_dists = squareform(sq_dists) + K = np.exp(-gamma * mat_sq_dists) + #LOGGER.info(f"{K}") + elif kernel == 'sigmoid': + K = np.tanh(alpha * np.dot(coordsets, coordsets.T) + coef0) + #LOGGER.info(f"{K}") + else: + raise ValueError(f"Unknown kernel: {kernel}") + + LOGGER.info("Kernel Matrix computed") + + return K + + + def createArtificialLabels(self, coordsets, kernel='rbf', degree=3, gamma=None, alpha=1.0, coef0=0.0, n_clusters=2): + """Create artificial labels using KMeans clustering. + necessary for Cross Validation process; + else it returns an error""" + + K = self.computeKernelMatrix(coordsets, kernel=kernel, degree=degree, gamma=gamma, alpha=alpha, coef0=coef0) + LOGGER.info(f"{K}") + kmeans = KMeans(n_clusters=n_clusters, random_state=42).fit(K) + LOGGER.info(f"{kmeans}") + return kmeans.labels_ + + def calcModes(self, coordsets, n_modes=20, turbo=True, kernel='rbf', degree=3, gamma=None, alpha=1.0, coef0=0.0, optimize_params=False, best_kernel=False, max_iter=500, labels=None, **kwargs): + """Calculate principal (or essential) modes using Kernel PCA with a kernel specified. + + linear: no additional parameter required + poly: degree (default = 3), recommended 2-6 + rbf: gamma (default = None, integreted calculation using coordsets.shape), recommended 0.3-0.8 + sigmoid: alpha, coef0 (default = 1.0 and 0.0), recommended 0.2-1.0 and 0-10""" + + LOGGER.timeit('_prody_pca') + start = time.time() + + # check if coordsets are either Ensemble, Atomic, TrajBase or np.ndarray + if not isinstance(coordsets, (Ensemble, Atomic, TrajBase, np.ndarray)): + raise TypeError('coordsets must be an Ensemble, Atomic, Numpy array instance') + + # Check if coordsets has the correct shape and extract coordinates according to its features + if isinstance(coordsets, np.ndarray): + if (coordsets.ndim != 3 or coordsets.shape[2] != 3 or coordsets.dtype not in (np.float32, float)): + raise ValueError('coordsets is not a valid coordinate array') + elif isinstance(coordsets, Atomic): + coordsets = coordsets._getCoordsets() + elif isinstance(coordsets, Ensemble): + self._atoms = coordsets.getAtoms() + LOGGER.debug(f"Atom set: {self._atoms}") + if isinstance(coordsets, PDBEnsemble): + weights = coordsets.getWeights() > 0 + coordsets = coordsets._getCoordsets() + if coordsets.ndim != 3 or coordsets.shape[2] != 3: + raise ValueError('coordsets should have shape (n_frames, n_atoms, 3)') + + # reshape coordsets to fit the required shape if necessary + coordsets = coordsets.reshape((coordsets.shape[0], -1)) + n_atoms_from_coordsets = coordsets.shape[1] // 3 + + # debug message to check if the coordinates are parsed correctly + if self._atoms is not None: + num_atoms_from_ensemble = len(self._atoms) + LOGGER.debug(f"Number of atoms from ensemble: {num_atoms_from_ensemble}") + if n_atoms_from_coordsets != num_atoms_from_ensemble: + raise ValueError(f'Number of atoms in ensemble ({num_atoms_from_ensemble}) does not match coordsets ({n_atoms_from_coordsets})') + + # Create artifical labels using KMeans for Logistic Regression + if labels is None: + labels = self.createArtificialLabels(coordsets, kernel=kernel, degree=degree, gamma=gamma, alpha=alpha, coef0=coef0) + + # optimize parameters if optimize_params=True + if optimize_params==True: + # Optimize kernel PCA parameters when chosing a kernel manually + if kernel in ['rbf', 'poly', 'sigmoid'] and any(param is None for param in [degree, gamma, alpha, coef0]): + best_params = self.optimizeKernelPCAParams(coordsets, kernel=kernel, n_modes=n_modes, max_iter=max_iter, labels=labels) + degree = best_params.get('kpca__degree', degree) + gamma = best_params.get('kpca__gamma', gamma) + alpha = best_params.get('kpca__alpha', alpha) + coef0 = best_params.get('kpca__coef0', coef0) + + # Compute the kernel matrix + K = self.computeKernelMatrix(coordsets, kernel=kernel, degree=degree, gamma=gamma, alpha=alpha, coef0=coef0) + #LOGGER.info(f"{np.array(K)}") + + # Center the kernel matrix + N = K.shape[0] + #LOGGER.info(f"{N}") + one_n = np.ones((N, N)) / N + #LOGGER.info(f"{one_n}") + K_centered = K - one_n @ K - K @ one_n + one_n @ K @ one_n + #LOGGER.info(f"{K_centered}") + + # Solve the eigenvalue problem for the centered kernel matrix + #values, vectors = eigh(K_centered) + values, vectors, _ = solveEig(K_centered, n_modes=n_modes, zeros=True, turbo=turbo, reverse=True, **kwargs) + #LOGGER.info(f"{values}") + #LOGGER.info(f"{vectors}") + + # Filter very small and negative eigenvalues + # ZERO = 1e-10 # treshold can be hardcoded here + which = values > ZERO + #LOGGER.info(f"{which}") + values = values[which] + #LOGGER.info(f"{values}") + vectors = vectors[:, which] + #LOGGER.info(f"{vectors}") + + # Select the top n_modes eigenvalues and eigenvectors + # not sure if this makes any difference for other systems, it did not for the ones I tested + indices = np.argsort(values)[::-1][:n_modes] + #LOGGER.info(f"{indices}") + values = values[indices] + #LOGGER.info(f"{values}") + vectors = vectors[:, indices] + #LOGGER.info(f"{vectors}") + + # assign necessary data + self._eigvals = values + #LOGGER.info(f"{self._eigvals}") + self._array = vectors + #LOGGER.info(f"{self._array}") + self._vars = values + #LOGGER.info(f"{self._eigvals}") + self._n_modes = len(values) + #LOGGER.info(f"{self._n_modes}") + + if self._n_modes > 1: + LOGGER.debug('{0} modes were calculated in {1:.2f}s.'.format(self._n_modes, time.time()-start)) + else: + LOGGER.debug('{0} mode was calculated in {1:.2f}s.'.format(self._n_modes, time.time()-start)) + #LOGGER.debug(f'PCA eigenvalues shape: {self._eigvals.shape}') + #LOGGER.debug(f'PCA array shape: {self._array.shape}') + + # Ensure the results are in the expected format + if self._array.shape[1] != n_modes: + raise ValueError(f'Number of modes in PCA array ({self._array.shape[1]}) does not match expected ({n_modes})') + + + def optimizeKernelPCAParams(self, coordsets, kernel='rbf', n_modes=10, cv=5, max_iter=500, solver='sag', labels=None, **kwargs): + """This function is used to determine the best parameters + for each Kernel of the Kernel PCA approach using a Pipeline + consisting of a Standard Scaler to scale the data accordingly, + an implemented Kernel PCA from the sklearn package and + a linear Regression model with the predefined 'newton-cg' solver. + The Cross Validation to determine the best parameters is based + on a grid-search algorithm which relies on a K-Mean validation process. + + Output: best parameters for each kernel (default: rbf) + """ + + if kernel == 'poly': + param_grid = { + 'kpca__degree': np.arange(1, 7) + } + LOGGER.info(f"{param_grid}") + elif kernel == 'linear': + param_grid = {} + LOGGER.info(f"{param_grid}") + elif kernel == 'rbf': + param_grid = { + 'kpca__gamma': np.logspace(-3, 3, 7) + } + LOGGER.info(f"{param_grid}") + elif kernel == 'sigmoid': + param_grid = { + 'kpca__alpha': np.linspace(0.02, 1.00, 10) , + 'kpca__coef0': np.linspace(0.00, 10, 10) + } + LOGGER.info(f"{param_grid}") + else: + raise ValueError(f"Unsupported kernel: {kernel}") + + # create the logistic regression calculation + log_reg = LogisticRegression(max_iter=max_iter, solver=solver) + LOGGER.info(f"{log_reg}") + + # create the cross validation pipeline + clf = Pipeline([ + ("scaler", StandardScaler()), + ("kpca", KernelPCA(n_components=n_modes, kernel=kernel)), + ("log_reg", log_reg)]) + LOGGER.info(f"{clf}") + #LOGGER.info("Pipeline clf created") + + # run cross validation of the parameters + if param_grid: + grid_search = GridSearchCV(clf, param_grid, cv=cv) + grid_search.fit(coordsets, labels) + best_params = grid_search.best_params_ + LOGGER.info(f"Best parameters found for kernel {kernel}: {best_params}") + else: + best_params = {} + LOGGER.info(f"No parameter tuning required for kernel {kernel}") + + return best_params + + + ## not implemented in the calculation yet + def BestKernel(self, n_modes=10, cv=5, max_iter=500, solver='newton-cg', **kwargs): + """Use this function to determine the best Kernel for your dataset + solver set to 'newton-cg' on default again + sigmoid runs a very long time; consider not using this one + """ + + LOGGER.info("Testing the best Kernel for Kernel PCA") + + # Define Kernel types + kernels = ['linear', 'poly', 'rbf', 'sigmoid'] # hardcoded here, more could be added, or deleted but then need to adapt also all other functions + best_kernel = None + best_params = None + best_score = -np.inf + + + # Test Kernels + for kernel in kernels: + LOGGER.info(f"Testing kernel: {kernel}") + + # Optimize kernel PCA parameters for current kernel and create pipeline with Kernel PCA and Logistic Regression + params = self.optimizeKernelPCAParams(coordsets, kernel=kernel, n_modes=n_modes, cv=cv, max_iter=max_iter) + + if kernel == 'linear': + kpca_params = {} + else: + kpca_params = {key[len('kpca__'):]: value for key, value in params.items() if key.startswith('kpca__')} + + kpca = KernelPCA(n_components=n_modes, kernel=kernel, **kpca_params) + log_reg = LogisticRegression(max_iter=max_iter, solver=solver) + + clf = Pipeline([ + ("scaler", StandardScaler()), + ("kpca", kpca), + ("log_reg", log_reg) + ]) + + scores = cross_val_score(clf, coordsets, cv=cv, scoring='accuracy', n_jobs=-1) + score = scores.mean() + + LOGGER.info(f"Kernel: {kernel}, Mean CV Score: {mean_score:.4f}") + + # Store best kernel and keep it updated + if score > best_score: + best_score = score + best_kernel = kernel + best_params = params + + LOGGER.info(f"Best kernel: {best_kernel} with a score of: {best_score:.4f}") + LOGGER.info(f"Best parameters: {best_params}") + + return best_kernel, best_params + + # Standard PCA script starts here + def setCovariance(self, covariance, is3d=True): """Set covariance matrix.""" + if not isinstance(covariance, np.ndarray): raise TypeError('covariance must be an ndarray') - elif not (covariance.ndim == 2 and covariance.shape[0] == covariance.shape[1]): + elif not (covariance.ndim == 2 and + covariance.shape[0] == covariance.shape[1]): raise ValueError('covariance must be square matrix') elif covariance.dtype != float: try: @@ -62,7 +348,7 @@ def setCovariance(self, covariance, is3d=True): else: self._n_atoms = self._dof self._trace = self._cov.trace() - + def buildCovariance(self, coordsets, **kwargs): """Build a covariance matrix for *coordsets* using mean coordinates as the reference. *coordsets* argument may be one of the following: @@ -81,6 +367,7 @@ def buildCovariance(self, coordsets, **kwargs): coordinate set (see :meth:`.Frame.superpose`). If frames are already aligned, use ``aligned=True`` argument to skip this step. + .. note:: If *coordsets* is a :class:`.PDBEnsemble` instance, coordinates are treated specially. Let's say **C**\_ij is the element of the @@ -116,6 +403,7 @@ def buildCovariance(self, coordsets, **kwargs): n_atoms = coordsets.numSelected() dof = n_atoms * 3 cov = np.zeros((dof, dof)) + #mean = coordsets._getCoords().flatten() n_confs = 0 n_frames = len(coordsets) if not quiet: @@ -167,7 +455,7 @@ def buildCovariance(self, coordsets, **kwargs): mean = coordsets.mean(0) if not quiet: LOGGER.progress('Building covariance', n_confs, - '_prody_pca') + '_prody_pca') for i, coords in enumerate(coordsets.reshape(s)): deviations = coords - mean cov += np.outer(deviations, deviations) @@ -187,17 +475,54 @@ def buildCovariance(self, coordsets, **kwargs): divide_by = weights.astype(float).repeat(3, axis=2).reshape(s) self._cov = np.dot(d_xyz.T, d_xyz) / np.dot(divide_by.T, divide_by) - if update_coords and ensemble is not None: - if mean is None: - mean = coordsets.mean(0) - ensemble.setCoords(mean) + if update_coords and ensemble is not None: + if mean is None: + mean = coordsets.mean(0) + ensemble.setCoords(mean) self._trace = self._cov.trace() self._dof = dof self._n_atoms = n_atoms if not quiet: LOGGER.report('Covariance matrix calculated in %2fs.', '_prody_pca') - + + + def calcModesPCA(self, n_modes=20, turbo=True, **kwargs): + """Calculate principal (or essential) modes. This method uses + :func:`scipy.linalg.eigh`, or :func:`numpy.linalg.eigh`, function + to diagonalize the covariance matrix. + + :arg n_modes: number of non-zero eigenvalues/vectors to calculate, + default is 20, + if **None** or ``'all'`` is given, all modes will be calculated + :type n_modes: int + + :arg turbo: when available, use a memory intensive but faster way to + calculate modes, default is **True** + :type turbo: bool""" + + if self._cov is None: + raise ValueError('covariance matrix is not built or set') + start = time.time() + self._clear() + if str(n_modes).lower() == 'all': + n_modes = None + + values, vectors, _ = solveEig(self._cov, n_modes=n_modes, zeros=True, + turbo=turbo, reverse=True, **kwargs) + which = values > ZERO + self._eigvals = values[which] + self._array = vectors[:, which] + self._vars = values[which] + self._n_modes = len(self._eigvals) + if self._n_modes > 1: + LOGGER.debug('{0} modes were calculated in {1:.2f}s.' + .format(self._n_modes, time.time()-start)) + else: + LOGGER.debug('{0} mode was calculated in {1:.2f}s.' + .format(self._n_modes, time.time()-start)) + + def performSVD(self, coordsets): """Calculate principal modes using singular value decomposition (SVD). *coordsets* argument may be a :class:`.Atomic`, :class:`.Ensemble`, @@ -252,209 +577,35 @@ def performSVD(self, coordsets): self._trace = self._vars.sum() self._n_modes = len(self._eigvals) LOGGER.debug('{0} modes were calculated in {1:.2f}s.' - .format(self._n_modes, time.time()-start)) - + .format(self._n_modes, time.time()-start)) + def addEigenpair(self, eigenvector, eigenvalue=None): """Add eigen *vector* and eigen *value* pair(s) to the instance. If eigen *value* is omitted, it will be set to 1. Eigenvalues are set as variances.""" - super().addEigenpair(self, eigenvector, eigenvalue) + NMA.addEigenpair(self, eigenvector, eigenvalue) self._vars = self._eigvals + def setEigens(self, vectors, values=None): """Set eigen *vectors* and eigen *values*. If eigen *values* are omitted, they will be set to 1. Eigenvalues are set as variances.""" self._clear() - super().setEigens(self, vectors, values) + NMA.setEigens(self, vectors, values) self._vars = self._eigvals - - - def computeKernelMatrix(self, coordsets, kernel='rbf', degree=3, gamma=None, alpha=1.0, coef0=0.0): - """Compute the kernel matrix for Kernel PCA.""" - - LOGGER.info("Computing Kernel Matrix") - if isinstance(coordsets, np.ndarray): - coordsets = coordsets.reshape((coordsets.shape[0], -1)) - elif isinstance(coordsets, Ensemble): - coordsets = coordsets.getCoordsets().reshape((coordsets.numConfs(), -1)) - else: - raise TypeError('X must be a numpy array or Ensemble') - - if kernel == 'linear': - return np.dot(coordsets, coordsets.T) - elif kernel == 'poly': - return (np.dot(coordsets, coordsets.T) + 1) ** degree - elif kernel == 'rbf': - if gamma is None: - gamma = 1.0 / coordsets.shape[1] - sq_dists = pdist(coordsets, 'sqeuclidean') - mat_sq_dists = squareform(sq_dists) - return np.exp(-gamma * mat_sq_dists) - elif kernel == 'sigmoid': - return np.tanh(alpha * np.dot(coordsets, coordsets.T) + coef0) - else: - raise ValueError(f"Unsupported kernel: {kernel}") - - LOGGER.info("Kernel Matrix computed") - - def calcModes(self, n_modes=20, turbo=True, kernel=None, degree=3, gamma=None, alpha=1.0, coef0=0.0, cv=5, max_iter=500, **kwargs): - """Calculate principal (or essential) modes using Kernel PCA if a kernel is specified.""" - - if kernel: - LOGGER.info("Performing Kernel PCA") - if self._cov is None: - raise ValueError('covariance matrix is not built or set') - start = time.time() - self._clear() - if str(n_modes).lower() == 'all': - n_modes = None - - #Create artifical labels using KMeans - labels = self.createArtificialLabels(self._cov) - - # Optimize kernel PCA parameters - if kernel in ['rbf', 'poly', 'sigmoid']: - best_params = self.optimizeKernelPCAParams(kernel=kernel, n_modes=n_modes, cv=cv, labels=labels, max_iter=max_iter) - degree = best_params.get('kpca__degree', degree) - gamma = best_params.get('kpca__gamma', gamma) - alpha = best_params.get('kpca__alpha', alpha) - coef0 = best_params.get('kpca__coef0', coef0) - - # Compute the kernel matrix - K = self.computeKernelMatrix(self._cov, kernel=kernel, degree=degree, gamma=gamma, alpha=alpha, coef0=coef0) - - # Center the kernel matrix - N = K.shape[0] - one_n = np.ones((N, N)) / N - K_centered = K - one_n @ K - K @ one_n + one_n @ K @ one_n - - # Solve the eigenvalue problem for the centered kernel matrix - values, vectors = eigh(K_centered) - which = values > ZERO - self._eigvals = values[which] - self._array = vectors[:, which] - self._vars = values[which] - self._n_modes = len(self._eigvals) - if self._n_modes > 1: - LOGGER.debug('{0} modes were calculated in {1:.2f}s.' - .format(self._n_modes, time.time()-start)) - else: - LOGGER.debug('{0} mode was calculated in {1:.2f}s.' - .format(self._n_modes, time.time()-start)) - else: - # Fallback to original PCA calculation if no kernel is specified - LOGGER.info("Standard PCA path") - if self._cov is None: - raise ValueError('covariance matrix is not built or set') - start = time.time() - self._clear() - if str(n_modes).lower() == 'all': - n_modes = None - - values, vectors, _ = solveEig(self._cov, n_modes=n_modes, zeros=True, - turbo=turbo, reverse=True, **kwargs) - which = values > ZERO - self._eigvals = values[which] - self._array = vectors[:, which] - self._vars = values[which] - self._n_modes = len(self._eigvals) - if self._n_modes > 1: - LOGGER.debug('{0} modes were calculated in {1:.2f}s.' - .format(self._n_modes, time.time()-start)) - else: - LOGGER.debug('{0} mode was calculated in {1:.2f}s.' - .format(self._n_modes, time.time()-start)) - - - def findBestSolver(self, max_iter=500, cv=5, labels=None): - log_reg = LogisticRegression(max_iter=max_iter) - - clf = Pipeline([ - ('scaler', StandardScaler()), - ('log_reg', log_reg) - ]) - - param_grid = { - 'log_reg__solver': ['lbfgs', 'liblinear', 'sag', 'saga', 'newton-cg'] - } - - LOGGER.info("Finding the best solver") - - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=UserWarning) - grid_search = GridSearchCV(clf, param_grid, cv=cv) - grid_search.fit(self._cov, labels) - - best_solver = grid_search.best_params_['log_reg__solver'] - LOGGER.info(f"Best solver found: {best_solver}") - - return best_solver - - def optimizeKernelPCAParams(self, kernel='rbf', n_modes=10, cv=5, labels=None, max_iter=500, **kwargs): - - best_solver = self.findBestSolver(max_iter=max_iter, cv=cv, labels=labels) - - if kernel == 'poly': - param_grid = { - 'kpca__degree': np.linspace(1, 6, 6) - } - elif kernel == 'rbf': - param_grid = { - 'kpca__gamma': np.linspace(0.03, 0.06, 20) - } - elif kernel == 'sigmoid': - param_grid = { - 'kpca__alpha': np.linspace(0.02, 1.00, 50) , - 'kpca__coef0': np.linspace(0.00, 10, 50) - } - else: - raise ValueError(f"Unsupported kernel: {kernel}") - - - log_reg = LogisticRegression(max_iter=max_iter, solver=best_solver) - - clf = Pipeline([ - ("scaler", StandardScaler()), - ("kpca", KernelPCA(n_components=n_modes, kernel=kernel)), - ("log_reg", log_reg)]) - - LOGGER.info("Pipeline clf created") - - grid_search = GridSearchCV(clf, param_grid, cv=cv) - grid_search.fit(self._cov, labels) - - # Log mean test score for each solver - cv_results = grid_search.cv_results_ - LOGGER.info("Mean test scores:") - for params, mean_score in zip(cv_results['params'], cv_results['mean_test_score']): - LOGGER.info(f"Solver: {params['log_reg__solver']}, Mean score: {mean_score:.4f}") - - best_params = grid_search.best_params_ - LOGGER.info(f"Best parameters found for kernel {kernel}: {best_params}") - - return best_params - - - def createArtificialLabels(self, coordsets, n_clusters=2): - """Create artificial labels using KMeans clustering.""" - kmeans = KMeans(n_clusters=n_clusters, random_state=42).fit(coordsets) - return kmeans.labels_ - - def _clear(self): """Clear attributes.""" self._eigvals = None self._array = None self._vars = None self._n_modes = None + + - def _report(self): - """Report calculation results.""" - pass # Replace with actual reporting functionality class EDA(PCA):