From fbf811d682fb62cfeb5e72b200f978c1d85caa61 Mon Sep 17 00:00:00 2001 From: nerkulec Date: Mon, 11 Nov 2024 13:40:46 +0100 Subject: [PATCH] Fixed Mutual Information formula --- mala/network/mutual_information_analyzer.py | 174 +++++++++----------- 1 file changed, 75 insertions(+), 99 deletions(-) diff --git a/mala/network/mutual_information_analyzer.py b/mala/network/mutual_information_analyzer.py index 8a40150a6..180159f03 100644 --- a/mala/network/mutual_information_analyzer.py +++ b/mala/network/mutual_information_analyzer.py @@ -13,12 +13,63 @@ import sklearn.mixture import sklearn.covariance import matplotlib.pyplot as plt -from sklearn.preprocessing import StandardScaler, Normalizer descriptor_input_types_acsd = descriptor_input_types + ["numpy", "openpmd"] target_input_types_acsd = target_input_types + ["numpy", "openpmd"] +def normalize(data): + mean = np.mean(data, axis=0) + std = np.std(data, axis=0) + std_nonzero = std > 1e-6 + data = data[:, std_nonzero] + mean = mean[std_nonzero] + std = std[std_nonzero] + data = (data - mean) / std + return data + +def mutual_information(X, Y, n_components=None, max_iter=1000, n_samples=100000, covariance_type='diag', normalize_data=False): + assert covariance_type == 'diag', "Only support covariance_type='diag' for now" + n = X.shape[0] + dim_X = X.shape[-1] + rand_subset = np.random.permutation(n)[:n_samples] + if normalize_data: + X = normalize(X) + Y = normalize(Y) + X = X[rand_subset] + Y = Y[rand_subset] + XY = np.concatenate([X, Y], axis=1) + d = XY.shape[-1] + if n_components is None: + n_components = d//2 + gmm_XY = sklearn.mixture.GaussianMixture(n_components=n_components, covariance_type=covariance_type, max_iter=max_iter) + gmm_XY.fit(XY) + + gmm_X = sklearn.mixture.GaussianMixture(n_components=n_components, covariance_type=covariance_type, max_iter=max_iter) + gmm_X.weights_ = gmm_XY.weights_ + gmm_X.means_ = gmm_XY.means_[:, :dim_X] + gmm_X.covariances_ = gmm_XY.covariances_[:, :dim_X] + gmm_X.precisions_ = gmm_XY.precisions_[:, :dim_X] + gmm_X.precisions_cholesky_ = gmm_XY.precisions_cholesky_[:, :dim_X] + + gmm_Y = sklearn.mixture.GaussianMixture(n_components=n_components, covariance_type=covariance_type, max_iter=max_iter) + gmm_Y.weights_ = gmm_XY.weights_ + gmm_Y.means_ = gmm_XY.means_[:, dim_X:] + gmm_Y.covariances_ = gmm_XY.covariances_[:, dim_X:] + gmm_Y.precisions_ = gmm_XY.precisions_[:, dim_X:] + gmm_Y.precisions_cholesky_ = gmm_XY.precisions_cholesky_[:, dim_X:] + + rand_perm = np.random.permutation(Y.shape[0]) + Y_perm = Y[rand_perm] + XY_perm = np.concatenate([X, Y_perm], axis=1) + temp = gmm_XY.score_samples(XY_perm) - gmm_X.score_samples(X) - gmm_Y.score_samples(Y_perm) + temp_exp = np.exp(temp) + mi = np.mean(temp_exp*temp) + # change log base e to log base 2 + mi = mi / np.log(2) + return mi + + class MutualInformationAnalyzer(ACSDAnalyzer): """ Analyzer based on mutual information analysis. @@ -48,36 +99,19 @@ def __init__( descriptor_calculator=descriptor_calculator, ) - @staticmethod - def __get_gmm( - data, - n_components=48, - max_iter=1000, - covariance_type="diag", - reg_covar=1e-6, - ): - gmm = sklearn.mixture.GaussianMixture( - n_components=n_components, - max_iter=max_iter, - covariance_type=covariance_type, - reg_covar=reg_covar, - ) - gmm.fit(data) - return gmm - @staticmethod def _calculate_acsd( descriptor_data, ldos_data, - acsd_points, + n_samples, descriptor_vectors_contain_xyz=True, ): """ - Calculate the ACSD for given descriptor and LDOS data. + Calculate the Mutual Information for given descriptor and LDOS data. - ACSD stands for average cosine similarity distance and is a metric - of how well the descriptors capture the local environment to a - degree where similar descriptor vectors result in simlar LDOS vectors. + Mutual Information measures how well the descriptors capture the + relevant information that is needed to predict the LDOS. + The unit of MI is bits. Parameters ---------- @@ -86,21 +120,15 @@ def _calculate_acsd( ldos_data : numpy.ndarray Array containing the LDOS. - - descriptor_vectors_contain_xyz : bool - If true, the xyz values are cut from the beginning of the - descriptor vectors. - - acsd_points : int - The number of points for which to calculate the ACSD. - The actual number of distances will be acsd_points x acsd_points, - since the cosine similarity is only defined for pairs. + + n_samples : int + The number of points for which to calculate the mutual information. Returns ------- - acsd : float - The average cosine similarity distance. - + mi : float + The mutual information between the descriptor and the LDOS in bits. + """ descriptor_dim = np.shape(descriptor_data) ldos_dim = np.shape(ldos_data) @@ -125,77 +153,25 @@ def _calculate_acsd( elif len(ldos_dim) != 2: raise Exception("Cannot work with this LDOS data.") - n_components = 48 - max_iter = 1000 - rand_perm = np.random.permutation(ldos_data.shape[0]) - perm_train = rand_perm[:acsd_points] - X_train = descriptor_data[perm_train] - Y_train = ldos_data[perm_train] - covariance = sklearn.covariance.EmpiricalCovariance() - covariance.fit(Y_train) plot = False if plot: + rand_perm = np.random.permutation(ldos_data.shape[0]) + perm_train = rand_perm[:n_samples] + X_train = descriptor_data[perm_train] + Y_train = ldos_data[perm_train] + covariance = sklearn.covariance.EmpiricalCovariance() + covariance.fit(Y_train) plt.imshow( covariance.covariance_, cmap="cool", interpolation="nearest" ) plt.show() - - scaler = Normalizer() - X_train = scaler.fit_transform(X_train) - covariance = sklearn.covariance.EmpiricalCovariance() - covariance.fit(X_train) - if plot: + + covariance = sklearn.covariance.EmpiricalCovariance() + covariance.fit(X_train) plt.imshow( covariance.covariance_, cmap="hot", interpolation="nearest" ) plt.show() - - X_shuffled = X_train[np.random.permutation(X_train.shape[0])] - - combined_train = np.concatenate((X_train, Y_train), axis=1) - combined_shuffled = np.concatenate((X_shuffled, Y_train), axis=1) - - # Apply a variance filter - variances_ldos = np.var(Y_train, axis=0) - variances_bisprectrum = np.var(X_train, axis=0) - filtered = np.shape(variances_bisprectrum > 1.0) - - gmm_Y = MutualInformationAnalyzer.__get_gmm( - Y_train, - n_components=int(ldos_dim[0] / 2), - covariance_type="diag", - ) - - gmm_X = MutualInformationAnalyzer.__get_gmm( - X_train, - n_components=int(descriptor_dim[0] / 2), - covariance_type="diag", - reg_covar=1e-5, - ) - - log_p_X = gmm_X.score_samples(X_train) - log_p_Y = gmm_Y.score_samples(Y_train) - - gmm_combined = MutualInformationAnalyzer.__get_gmm( - combined_train, - n_components=int(descriptor_dim[0] / 2), - max_iter=max_iter, - covariance_type="diag", - ) - gmm_combined_shuffled = MutualInformationAnalyzer.__get_gmm( - combined_shuffled, - n_components=int(descriptor_dim[0] / 2), - max_iter=max_iter, - covariance_type="diag", - ) - - log_p_combined = gmm_combined.score_samples(combined_train) - mi = np.mean(log_p_combined) - np.mean(log_p_Y) - np.mean(log_p_X) - - log_p_shuffled = gmm_combined_shuffled.score_samples(combined_shuffled) - mi_shuffled = ( - np.mean(log_p_shuffled) - np.mean(log_p_Y) - np.mean(log_p_X) - ) - - mi_difference = mi - mi_shuffled - return mi_difference + # The hyperparameters could be put potentially into the params. + mi = mutual_information(X_train, Y_train, n_components=None, n_samples=n_samples, covariance_type='diag', normalize_data=True) + return mi