Skip to content

Commit

Permalink
Fixed Mutual Information formula
Browse files Browse the repository at this point in the history
  • Loading branch information
nerkulec committed Nov 11, 2024
1 parent 87edede commit fbf811d
Showing 1 changed file with 75 additions and 99 deletions.
174 changes: 75 additions & 99 deletions mala/network/mutual_information_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,63 @@
import sklearn.mixture
import sklearn.covariance
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, Normalizer

descriptor_input_types_acsd = descriptor_input_types + ["numpy", "openpmd"]
target_input_types_acsd = target_input_types + ["numpy", "openpmd"]


def normalize(data):
mean = np.mean(data, axis=0)
std = np.std(data, axis=0)
std_nonzero = std > 1e-6
data = data[:, std_nonzero]
mean = mean[std_nonzero]
std = std[std_nonzero]
data = (data - mean) / std
return data

def mutual_information(X, Y, n_components=None, max_iter=1000, n_samples=100000, covariance_type='diag', normalize_data=False):
assert covariance_type == 'diag', "Only support covariance_type='diag' for now"
n = X.shape[0]
dim_X = X.shape[-1]
rand_subset = np.random.permutation(n)[:n_samples]
if normalize_data:
X = normalize(X)
Y = normalize(Y)
X = X[rand_subset]
Y = Y[rand_subset]
XY = np.concatenate([X, Y], axis=1)
d = XY.shape[-1]
if n_components is None:
n_components = d//2
gmm_XY = sklearn.mixture.GaussianMixture(n_components=n_components, covariance_type=covariance_type, max_iter=max_iter)
gmm_XY.fit(XY)

gmm_X = sklearn.mixture.GaussianMixture(n_components=n_components, covariance_type=covariance_type, max_iter=max_iter)
gmm_X.weights_ = gmm_XY.weights_
gmm_X.means_ = gmm_XY.means_[:, :dim_X]
gmm_X.covariances_ = gmm_XY.covariances_[:, :dim_X]
gmm_X.precisions_ = gmm_XY.precisions_[:, :dim_X]
gmm_X.precisions_cholesky_ = gmm_XY.precisions_cholesky_[:, :dim_X]

gmm_Y = sklearn.mixture.GaussianMixture(n_components=n_components, covariance_type=covariance_type, max_iter=max_iter)
gmm_Y.weights_ = gmm_XY.weights_
gmm_Y.means_ = gmm_XY.means_[:, dim_X:]
gmm_Y.covariances_ = gmm_XY.covariances_[:, dim_X:]
gmm_Y.precisions_ = gmm_XY.precisions_[:, dim_X:]
gmm_Y.precisions_cholesky_ = gmm_XY.precisions_cholesky_[:, dim_X:]

rand_perm = np.random.permutation(Y.shape[0])
Y_perm = Y[rand_perm]
XY_perm = np.concatenate([X, Y_perm], axis=1)
temp = gmm_XY.score_samples(XY_perm) - gmm_X.score_samples(X) - gmm_Y.score_samples(Y_perm)
temp_exp = np.exp(temp)
mi = np.mean(temp_exp*temp)
# change log base e to log base 2
mi = mi / np.log(2)
return mi


class MutualInformationAnalyzer(ACSDAnalyzer):
"""
Analyzer based on mutual information analysis.
Expand Down Expand Up @@ -48,36 +99,19 @@ def __init__(
descriptor_calculator=descriptor_calculator,
)

@staticmethod
def __get_gmm(
data,
n_components=48,
max_iter=1000,
covariance_type="diag",
reg_covar=1e-6,
):
gmm = sklearn.mixture.GaussianMixture(
n_components=n_components,
max_iter=max_iter,
covariance_type=covariance_type,
reg_covar=reg_covar,
)
gmm.fit(data)
return gmm

@staticmethod
def _calculate_acsd(
descriptor_data,
ldos_data,
acsd_points,
n_samples,
descriptor_vectors_contain_xyz=True,
):
"""
Calculate the ACSD for given descriptor and LDOS data.
Calculate the Mutual Information for given descriptor and LDOS data.
ACSD stands for average cosine similarity distance and is a metric
of how well the descriptors capture the local environment to a
degree where similar descriptor vectors result in simlar LDOS vectors.
Mutual Information measures how well the descriptors capture the
relevant information that is needed to predict the LDOS.
The unit of MI is bits.
Parameters
----------
Expand All @@ -86,21 +120,15 @@ def _calculate_acsd(
ldos_data : numpy.ndarray
Array containing the LDOS.
descriptor_vectors_contain_xyz : bool
If true, the xyz values are cut from the beginning of the
descriptor vectors.
acsd_points : int
The number of points for which to calculate the ACSD.
The actual number of distances will be acsd_points x acsd_points,
since the cosine similarity is only defined for pairs.
n_samples : int
The number of points for which to calculate the mutual information.
Returns
-------
acsd : float
The average cosine similarity distance.
mi : float
The mutual information between the descriptor and the LDOS in bits.
"""
descriptor_dim = np.shape(descriptor_data)
ldos_dim = np.shape(ldos_data)
Expand All @@ -125,77 +153,25 @@ def _calculate_acsd(
elif len(ldos_dim) != 2:
raise Exception("Cannot work with this LDOS data.")

n_components = 48
max_iter = 1000
rand_perm = np.random.permutation(ldos_data.shape[0])
perm_train = rand_perm[:acsd_points]
X_train = descriptor_data[perm_train]
Y_train = ldos_data[perm_train]
covariance = sklearn.covariance.EmpiricalCovariance()
covariance.fit(Y_train)
plot = False
if plot:
rand_perm = np.random.permutation(ldos_data.shape[0])
perm_train = rand_perm[:n_samples]
X_train = descriptor_data[perm_train]
Y_train = ldos_data[perm_train]
covariance = sklearn.covariance.EmpiricalCovariance()
covariance.fit(Y_train)
plt.imshow(
covariance.covariance_, cmap="cool", interpolation="nearest"
)
plt.show()

scaler = Normalizer()
X_train = scaler.fit_transform(X_train)
covariance = sklearn.covariance.EmpiricalCovariance()
covariance.fit(X_train)
if plot:

covariance = sklearn.covariance.EmpiricalCovariance()
covariance.fit(X_train)
plt.imshow(
covariance.covariance_, cmap="hot", interpolation="nearest"
)
plt.show()

X_shuffled = X_train[np.random.permutation(X_train.shape[0])]

combined_train = np.concatenate((X_train, Y_train), axis=1)
combined_shuffled = np.concatenate((X_shuffled, Y_train), axis=1)

# Apply a variance filter
variances_ldos = np.var(Y_train, axis=0)
variances_bisprectrum = np.var(X_train, axis=0)
filtered = np.shape(variances_bisprectrum > 1.0)

gmm_Y = MutualInformationAnalyzer.__get_gmm(
Y_train,
n_components=int(ldos_dim[0] / 2),
covariance_type="diag",
)

gmm_X = MutualInformationAnalyzer.__get_gmm(
X_train,
n_components=int(descriptor_dim[0] / 2),
covariance_type="diag",
reg_covar=1e-5,
)

log_p_X = gmm_X.score_samples(X_train)
log_p_Y = gmm_Y.score_samples(Y_train)

gmm_combined = MutualInformationAnalyzer.__get_gmm(
combined_train,
n_components=int(descriptor_dim[0] / 2),
max_iter=max_iter,
covariance_type="diag",
)
gmm_combined_shuffled = MutualInformationAnalyzer.__get_gmm(
combined_shuffled,
n_components=int(descriptor_dim[0] / 2),
max_iter=max_iter,
covariance_type="diag",
)

log_p_combined = gmm_combined.score_samples(combined_train)
mi = np.mean(log_p_combined) - np.mean(log_p_Y) - np.mean(log_p_X)

log_p_shuffled = gmm_combined_shuffled.score_samples(combined_shuffled)
mi_shuffled = (
np.mean(log_p_shuffled) - np.mean(log_p_Y) - np.mean(log_p_X)
)

mi_difference = mi - mi_shuffled
return mi_difference
# The hyperparameters could be put potentially into the params.
mi = mutual_information(X_train, Y_train, n_components=None, n_samples=n_samples, covariance_type='diag', normalize_data=True)
return mi

0 comments on commit fbf811d

Please sign in to comment.