diff --git a/mala/__init__.py b/mala/__init__.py index 6646077b5..5c578bf3b 100644 --- a/mala/__init__.py +++ b/mala/__init__.py @@ -40,9 +40,10 @@ HyperparameterOAT, HyperparameterNASWOT, HyperparameterOptuna, - HyperparameterACSD, + HyperparameterDescriptorScoring, ACSDAnalyzer, Runner, + MutualInformationAnalyzer, ) from .targets import LDOS, DOS, Density, fermi_function, AtomicForce, Target from .interfaces import MALA diff --git a/mala/common/parameters.py b/mala/common/parameters.py index 30ce695ba..3e4c531c5 100644 --- a/mala/common/parameters.py +++ b/mala/common/parameters.py @@ -1101,6 +1101,7 @@ def __init__(self): # For accelerated hyperparameter optimization. self.acsd_points = 100 + self.mutual_information_points = 20000 @property def rdb_storage_heartbeat(self): diff --git a/mala/network/__init__.py b/mala/network/__init__.py index eaa50c125..e058688aa 100644 --- a/mala/network/__init__.py +++ b/mala/network/__init__.py @@ -11,6 +11,7 @@ from .hyperparameter_oat import HyperparameterOAT from .hyperparameter_naswot import HyperparameterNASWOT from .hyperparameter_optuna import HyperparameterOptuna -from .hyperparameter_acsd import HyperparameterACSD +from .hyperparameter_descriptor_scoring import HyperparameterDescriptorScoring from .acsd_analyzer import ACSDAnalyzer +from .mutual_information_analyzer import MutualInformationAnalyzer from .runner import Runner diff --git a/mala/network/acsd_analyzer.py b/mala/network/acsd_analyzer.py index 317a425f1..049a1d824 100644 --- a/mala/network/acsd_analyzer.py +++ b/mala/network/acsd_analyzer.py @@ -1,28 +1,17 @@ """Class for performing a full ACSD analysis.""" -import itertools -import os - import numpy as np from mala.datahandling.data_converter import ( descriptor_input_types, target_input_types, ) -from mala.descriptors.descriptor import Descriptor -from mala.targets.target import Target -from mala.network.hyperparameter import Hyperparameter -from mala.network.hyper_opt import HyperOpt -from mala.common.parallelizer import get_rank, printout -from mala.descriptors.bispectrum import Bispectrum -from mala.descriptors.atomic_density import AtomicDensity -from mala.descriptors.minterpy_descriptors import MinterpyDescriptors - -descriptor_input_types_acsd = descriptor_input_types + ["numpy", "openpmd"] -target_input_types_acsd = target_input_types + ["numpy", "openpmd"] +from mala.network.descriptor_scoring_optimizer import ( + DescriptorScoringOptimizer, +) -class ACSDAnalyzer(HyperOpt): +class ACSDAnalyzer(DescriptorScoringOptimizer): """ Analyzer based on the ACSD analysis. @@ -47,673 +36,23 @@ class ACSDAnalyzer(HyperOpt): def __init__( self, params, target_calculator=None, descriptor_calculator=None ): - super(ACSDAnalyzer, self).__init__(params) - # Calculators used to parse data from compatible files. - self._target_calculator = target_calculator - if self._target_calculator is None: - self._target_calculator = Target(params) - self._descriptor_calculator = descriptor_calculator - if self._descriptor_calculator is None: - self._descriptor_calculator = Descriptor(params) - if ( - not isinstance(self._descriptor_calculator, Bispectrum) - and not isinstance(self._descriptor_calculator, AtomicDensity) - and not isinstance( - self._descriptor_calculator, MinterpyDescriptors - ) - ): - raise Exception( - "Cannot calculate ACSD for the selected descriptors." - ) - - # Internal variables. - self.__snapshots = [] - self.__snapshot_description = [] - self.__snapshot_units = [] - - # Filled after the analysis. - self._labels = [] - self._study = [] - self._reduced_study = None - self._internal_hyperparam_list = None - - def add_snapshot( - self, - descriptor_input_type=None, - descriptor_input_path=None, - target_input_type=None, - target_input_path=None, - descriptor_units=None, - target_units=None, - ): - """ - Add a snapshot to be processed. - - Parameters - ---------- - descriptor_input_type : string - Type of descriptor data to be processed. - See mala.datahandling.data_converter.descriptor_input_types - for options. - - descriptor_input_path : string - Path of descriptor data to be processed. - - target_input_type : string - Type of target data to be processed. - See mala.datahandling.data_converter.target_input_types - for options. - - target_input_path : string - Path of target data to be processed. - - descriptor_units : string - Units for descriptor data processing. - - target_units : string - Units for target data processing. - """ - # Check the input. - if descriptor_input_type is not None: - if descriptor_input_path is None: - raise Exception( - "Cannot process descriptor data with no path given." - ) - if descriptor_input_type not in descriptor_input_types_acsd: - raise Exception("Cannot process this type of descriptor data.") - else: - raise Exception("Cannot calculate ACSD without descriptor data.") - - if target_input_type is not None: - if target_input_path is None: - raise Exception( - "Cannot process target data with no path given." - ) - if target_input_type not in target_input_types_acsd: - raise Exception("Cannot process this type of target data.") - else: - raise Exception("Cannot calculate ACSD without target data.") - - # Assign info. - self.__snapshots.append( - {"input": descriptor_input_path, "output": target_input_path} - ) - self.__snapshot_description.append( - {"input": descriptor_input_type, "output": target_input_type} - ) - self.__snapshot_units.append( - {"input": descriptor_units, "output": target_units} - ) - - def add_hyperparameter(self, name, choices): - """ - Add a hyperparameter to the current investigation. - - Parameters - ---------- - name : string - Name of the hyperparameter. Please note that these names always - have to be the same as the parameter names in - ParametersDescriptors. - - choices : - List of possible choices. - """ - if name not in [ - "bispectrum_twojmax", - "bispectrum_cutoff", - "atomic_density_sigma", - "atomic_density_cutoff", - "minterpy_cutoff_cube_size", - "minterpy_polynomial_degree", - "minterpy_lp_norm", - ]: - raise Exception("Unkown hyperparameter for ACSD analysis entered.") - - self.params.hyperparameters.hlist.append( - Hyperparameter( - hotype="acsd", - name=name, - choices=choices, - opttype="categorical", - ) - ) - - def perform_study( - self, file_based_communication=False, return_plotting=False - ): - """ - Perform the study, i.e. the optimization. - - This is done by sampling different descriptors, calculated with - different hyperparameters and then calculating the ACSD. - """ - # Prepare the hyperparameter lists. - self._construct_hyperparam_list() - hyperparameter_tuples = list( - itertools.product(*self._internal_hyperparam_list) + super(ACSDAnalyzer, self).__init__( + params, + target_calculator=target_calculator, + descriptor_calculator=descriptor_calculator, ) - # Perform the ACSD analysis separately for each snapshot. - best_acsd = None - best_trial = None - for i in range(0, len(self.__snapshots)): - printout( - "Starting ACSD analysis of snapshot", str(i), min_verbosity=1 - ) - current_list = [] - target = self._load_target( - self.__snapshots[i], - self.__snapshot_description[i], - self.__snapshot_units[i], - file_based_communication, - ) - - for idx, hyperparameter_tuple in enumerate(hyperparameter_tuples): - if isinstance(self._descriptor_calculator, Bispectrum): - self.params.descriptors.bispectrum_cutoff = ( - hyperparameter_tuple[0] - ) - self.params.descriptors.bispectrum_twojmax = ( - hyperparameter_tuple[1] - ) - elif isinstance(self._descriptor_calculator, AtomicDensity): - self.params.descriptors.atomic_density_cutoff = ( - hyperparameter_tuple[0] - ) - self.params.descriptors.atomic_density_sigma = ( - hyperparameter_tuple[1] - ) - elif isinstance( - self._descriptor_calculator, MinterpyDescriptors - ): - self.params.descriptors.atomic_density_cutoff = ( - hyperparameter_tuple[0] - ) - self.params.descriptors.atomic_density_sigma = ( - hyperparameter_tuple[1] - ) - self.params.descriptors.minterpy_cutoff_cube_size = ( - hyperparameter_tuple[2] - ) - self.params.descriptors.minterpy_polynomial_degree = ( - hyperparameter_tuple[3] - ) - self.params.descriptors.minterpy_lp_norm = ( - hyperparameter_tuple[4] - ) - - descriptor = self._calculate_descriptors( - self.__snapshots[i], - self.__snapshot_description[i], - self.__snapshot_units[i], - ) - if get_rank() == 0: - acsd = self._calculate_acsd( - descriptor, - target, - self.params.hyperparameters.acsd_points, - descriptor_vectors_contain_xyz=self.params.descriptors.descriptors_contain_xyz, - ) - if not np.isnan(acsd): - if best_acsd is None: - best_acsd = acsd - best_trial = idx - elif acsd < best_acsd: - best_acsd = acsd - best_trial = idx - current_list.append( - list(hyperparameter_tuple) + [acsd] - ) - else: - current_list.append( - list(hyperparameter_tuple) + [np.inf] - ) - - outstring = "[" - for label_id, label in enumerate(self._labels): - outstring += ( - label + ": " + str(hyperparameter_tuple[label_id]) - ) - if label_id < len(self._labels) - 1: - outstring += ", " - outstring += "]" - best_trial_string = ". No suitable trial found yet." - if best_acsd is not None: - best_trial_string = ( - ". Best trial is " - + str(best_trial) - + " with " - + str(best_acsd) - ) - - printout( - "Trial", - idx, - "finished with ACSD=" + str(acsd), - "and parameters:", - outstring + best_trial_string, - min_verbosity=1, - ) - - if get_rank() == 0: - self._study.append(current_list) - - if get_rank() == 0: - self._study = np.mean(self._study, axis=0) - - # TODO: Does this even make sense for the minterpy descriptors? - if return_plotting: - results_to_plot = [] - if len(self._internal_hyperparam_list) == 2: - len_first_dim = len(self._internal_hyperparam_list[0]) - len_second_dim = len(self._internal_hyperparam_list[1]) - for i in range(0, len_first_dim): - results_to_plot.append( - self._study[ - i * len_second_dim : (i + 1) * len_second_dim, - 2:, - ] - ) - - if isinstance(self._descriptor_calculator, Bispectrum): - return results_to_plot, { - "twojmax": self._internal_hyperparam_list[1], - "cutoff": self._internal_hyperparam_list[0], - } - if isinstance(self._descriptor_calculator, AtomicDensity): - return results_to_plot, { - "sigma": self._internal_hyperparam_list[1], - "cutoff": self._internal_hyperparam_list[0], - } - - def set_optimal_parameters(self): - """ - Set the optimal parameters found in the present study. - - The parameters will be written to the parameter object with which the - hyperparameter optimizer was created. - """ - if get_rank() == 0: - minimum_acsd = self._study[np.argmin(self._study[:, -1])] - if len(self._internal_hyperparam_list) == 2: - if isinstance(self._descriptor_calculator, Bispectrum): - self.params.descriptors.bispectrum_cutoff = minimum_acsd[0] - self.params.descriptors.bispectrum_twojmax = int( - minimum_acsd[1] - ) - printout( - "ACSD analysis finished, optimal parameters: ", - ) - printout( - "Bispectrum twojmax: ", - self.params.descriptors.bispectrum_twojmax, - ) - printout( - "Bispectrum cutoff: ", - self.params.descriptors.bispectrum_cutoff, - ) - if isinstance(self._descriptor_calculator, AtomicDensity): - self.params.descriptors.atomic_density_cutoff = ( - minimum_acsd[0] - ) - self.params.descriptors.atomic_density_sigma = ( - minimum_acsd[1] - ) - printout( - "ACSD analysis finished, optimal parameters: ", - ) - printout( - "Atomic density sigma: ", - self.params.descriptors.atomic_density_sigma, - ) - printout( - "Atomic density cutoff: ", - self.params.descriptors.atomic_density_cutoff, - ) - elif len(self._internal_hyperparam_list) == 5: - if isinstance( - self._descriptor_calculator, MinterpyDescriptors - ): - self.params.descriptors.atomic_density_cutoff = ( - minimum_acsd[0] - ) - self.params.descriptors.atomic_density_sigma = ( - minimum_acsd[1] - ) - self.params.descriptors.minterpy_cutoff_cube_size = ( - minimum_acsd[2] - ) - self.params.descriptors.minterpy_polynomial_degree = int( - minimum_acsd[3] - ) - self.params.descriptors.minterpy_lp_norm = int( - minimum_acsd[4] - ) - printout( - "ACSD analysis finished, optimal parameters: ", - ) - printout( - "Atomic density sigma: ", - self.params.descriptors.atomic_density_sigma, - ) - printout( - "Atomic density cutoff: ", - self.params.descriptors.atomic_density_cutoff, - ) - printout( - "Minterpy cube cutoff: ", - self.params.descriptors.minterpy_cutoff_cube_size, - ) - printout( - "Minterpy polynomial degree: ", - self.params.descriptors.minterpy_polynomial_degree, - ) - printout( - "Minterpy LP norm degree: ", - self.params.descriptors.minterpy_lp_norm, - ) - - def _construct_hyperparam_list(self): - if isinstance(self._descriptor_calculator, Bispectrum): - if ( - list( - map( - lambda p: "bispectrum_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - first_dim_list = [self.params.descriptors.bispectrum_cutoff] - else: - first_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "bispectrum_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "bispectrum_twojmax" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - second_dim_list = [self.params.descriptors.bispectrum_twojmax] - else: - second_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "bispectrum_twojmax" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - self._internal_hyperparam_list = [first_dim_list, second_dim_list] - self._labels = ["cutoff", "twojmax"] - - elif isinstance(self._descriptor_calculator, AtomicDensity): - if ( - list( - map( - lambda p: "atomic_density_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - first_dim_list = [ - self.params.descriptors.atomic_density_cutoff - ] - else: - first_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "atomic_density_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "atomic_density_sigma" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - second_dim_list = [ - self.params.descriptors.atomic_density_sigma - ] - else: - second_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "atomic_density_sigma" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - self._internal_hyperparam_list = [first_dim_list, second_dim_list] - self._labels = ["cutoff", "sigma"] - - elif isinstance(self._descriptor_calculator, MinterpyDescriptors): - if ( - list( - map( - lambda p: "atomic_density_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - first_dim_list = [ - self.params.descriptors.atomic_density_cutoff - ] - else: - first_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "atomic_density_cutoff" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "atomic_density_sigma" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - second_dim_list = [ - self.params.descriptors.atomic_density_sigma - ] - else: - second_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "atomic_density_sigma" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "minterpy_cutoff_cube_size" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - third_dim_list = [ - self.params.descriptors.minterpy_cutoff_cube_size - ] - else: - third_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "minterpy_cutoff_cube_size" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "minterpy_polynomial_degree" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - fourth_dim_list = [ - self.params.descriptors.minterpy_polynomial_degree - ] - else: - fourth_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "minterpy_polynomial_degree" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - if ( - list( - map( - lambda p: "minterpy_lp_norm" in p.name, - self.params.hyperparameters.hlist, - ) - ).count(True) - == 0 - ): - fifth_dim_list = [self.params.descriptors.minterpy_lp_norm] - else: - fifth_dim_list = self.params.hyperparameters.hlist[ - list( - map( - lambda p: "minterpy_lp_norm" in p.name, - self.params.hyperparameters.hlist, - ) - ).index(True) - ].choices - - self._internal_hyperparam_list = [ - first_dim_list, - second_dim_list, - third_dim_list, - fourth_dim_list, - fifth_dim_list, - ] - self._labels = [ - "cutoff", - "sigma", - "minterpy_cutoff", - "minterpy_polynomial_degree", - "minterpy_lp_norm", - ] - - else: - raise Exception( - "Unkown descriptor calculator selected. Cannot " - "calculate ACSD." - ) - - def _calculate_descriptors(self, snapshot, description, original_units): - descriptor_calculation_kwargs = {} - tmp_input = None - if description["input"] == "espresso-out": - descriptor_calculation_kwargs["units"] = original_units["input"] - tmp_input, local_size = ( - self._descriptor_calculator.calculate_from_qe_out( - snapshot["input"], **descriptor_calculation_kwargs - ) - ) - - elif description["input"] is None: - # In this case, only the output is processed. - pass - - else: - raise Exception( - "Unknown file extension, cannot convert descriptor" - ) - if self.params.descriptors._configuration["mpi"]: - tmp_input = self._descriptor_calculator.gather_descriptors( - tmp_input - ) - - return tmp_input - - def _load_target( - self, snapshot, description, original_units, file_based_communication - ): - memmap = None - if ( - self.params.descriptors._configuration["mpi"] - and file_based_communication - ): - memmap = "acsd.out.npy_temp" - - target_calculator_kwargs = {} - - # Read the output data - tmp_output = None - if description["output"] == ".cube": - target_calculator_kwargs["units"] = original_units["output"] - target_calculator_kwargs["use_memmap"] = memmap - # If no units are provided we just assume standard units. - tmp_output = self._target_calculator.read_from_cube( - snapshot["output"], **target_calculator_kwargs - ) - - elif description["output"] == ".xsf": - target_calculator_kwargs["units"] = original_units["output"] - target_calculator_kwargs["use_memmap"] = memmap - # If no units are provided we just assume standard units. - tmp_output = self._target_calculator.read_from_xsf( - snapshot["output"], **target_calculator_kwargs - ) - - elif description["output"] == "numpy": - if get_rank() == 0: - tmp_output = self._target_calculator.read_from_numpy_file( - snapshot["output"], units=original_units["output"] - ) - - elif description["output"] == "openpmd": - if get_rank() == 0: - tmp_output = self._target_calculator.read_from_numpy_file( - snapshot["output"], units=original_units["output"] - ) - else: - raise Exception("Unknown file extension, cannot convert target") - - if get_rank() == 0: - if ( - self.params.targets._configuration["mpi"] - and file_based_communication - ): - os.remove(memmap) + def _update_logging(self, score, index): + if self.best_score is None: + self.best_score = score + self.best_trial_index = index + elif score < self.best_score: + self.best_score = score + self.best_trial_index = index - return tmp_output + def _get_best_trial(self): + """Determine the best trial as given by this study.""" + return self._study[np.argmin(self._study[:, -1])] @staticmethod def _calculate_cosine_similarities( @@ -800,6 +139,14 @@ def _calculate_cosine_similarities( return np.array(similarity_array) + def _calculate_score(self, descriptor, target): + return self._calculate_acsd( + descriptor, + target, + self.params.hyperparameters.acsd_points, + descriptor_vectors_contain_xyz=self.params.descriptors.descriptors_contain_xyz, + ) + @staticmethod def _calculate_acsd( descriptor_data, @@ -837,10 +184,6 @@ def _calculate_acsd( The average cosine similarity distance. """ - - def distance_between_points(x1, y1, x2, y2): - return np.sqrt((x1 - x2) * (x1 - x2) + (y1 - y2) * (y1 - y2)) - similarity_data = ACSDAnalyzer._calculate_cosine_similarities( descriptor_data, ldos_data, @@ -848,16 +191,10 @@ def distance_between_points(x1, y1, x2, y2): descriptor_vectors_contain_xyz=descriptor_vectors_contain_xyz, ) data_size = np.shape(similarity_data)[0] - distances = [] - for i in range(0, data_size): - distances.append( - distance_between_points( - similarity_data[i, 0], - similarity_data[i, 1], - similarity_data[i, 0], - similarity_data[i, 0], - ) - ) + + # Subtracting LDOS similarities from bispectrum similiarities. + distances = similarity_data[:, 0] - similarity_data[:, 1] + distances = distances.clip(min=0) return np.mean(distances) @staticmethod diff --git a/mala/network/descriptor_scoring_optimizer.py b/mala/network/descriptor_scoring_optimizer.py new file mode 100644 index 000000000..11608f5d3 --- /dev/null +++ b/mala/network/descriptor_scoring_optimizer.py @@ -0,0 +1,554 @@ +"""Base class for ACSD, mutual information and related methods.""" + +from abc import abstractmethod, ABC +import itertools +import os + +import numpy as np + +from mala.datahandling.data_converter import ( + descriptor_input_types, + target_input_types, +) +from mala.descriptors.descriptor import Descriptor +from mala.targets.target import Target +from mala.network.hyperparameter import Hyperparameter +from mala.network.hyper_opt import HyperOpt +from mala.common.parallelizer import get_rank, printout +from mala.descriptors.bispectrum import Bispectrum +from mala.descriptors.atomic_density import AtomicDensity +from mala.descriptors.minterpy_descriptors import MinterpyDescriptors + +descriptor_input_types_descriptor_scoring = descriptor_input_types + [ + "numpy", + "openpmd", +] +target_input_types_descriptor_scoring = target_input_types + [ + "numpy", + "openpmd", +] + + +class DescriptorScoringOptimizer(HyperOpt, ABC): + """ + Base class for all training-free descriptor hyperparameter optimizers. + + These optimizer use alternative metrics ACSD, mutual information, etc. + to tune descriptor hyperparameters. + + Parameters + ---------- + params : mala.common.parametes.Parameters + Parameters used to create this hyperparameter optimizer. + + descriptor_calculator : mala.descriptors.descriptor.Descriptor + The descriptor calculator used for parsing/converting fingerprint + data. If None, the descriptor calculator will be created by this + object using the parameters provided. Default: None + + target_calculator : mala.targets.target.Target + Target calculator used for parsing/converting target data. If None, + the target calculator will be created by this object using the + parameters provided. Default: None + + Attributes + ---------- + best_score : float + Score associated with best-performing trial. + + best_trial_index : int + Index of best-performing trial + """ + + def __init__( + self, params, target_calculator=None, descriptor_calculator=None + ): + super(DescriptorScoringOptimizer, self).__init__(params) + # Calculators used to parse data from compatible files. + self._target_calculator = target_calculator + if self._target_calculator is None: + self._target_calculator = Target(params) + self._descriptor_calculator = descriptor_calculator + if self._descriptor_calculator is None: + self._descriptor_calculator = Descriptor(params) + if not isinstance( + self._descriptor_calculator, Bispectrum + ) and not isinstance(self._descriptor_calculator, AtomicDensity): + raise Exception("Unsupported descriptor type selected.") + + # Internal variables. + self._snapshots = [] + self._snapshot_description = [] + self._snapshot_units = [] + + # Filled after the analysis. + self._labels = [] + self._study = [] + self._reduced_study = None + self._internal_hyperparam_list = None + + # Logging metrics. + self.best_score = None + self.best_trial_index = None + + def add_snapshot( + self, + descriptor_input_type=None, + descriptor_input_path=None, + target_input_type=None, + target_input_path=None, + descriptor_units=None, + target_units=None, + ): + """ + Add a snapshot to be processed. + + Parameters + ---------- + descriptor_input_type : string + Type of descriptor data to be processed. + See mala.datahandling.data_converter.descriptor_input_types + for options. + + descriptor_input_path : string + Path of descriptor data to be processed. + + target_input_type : string + Type of target data to be processed. + See mala.datahandling.data_converter.target_input_types + for options. + + target_input_path : string + Path of target data to be processed. + + descriptor_units : string + Units for descriptor data processing. + + target_units : string + Units for target data processing. + """ + # Check the input. + if descriptor_input_type is not None: + if descriptor_input_path is None: + raise Exception( + "Cannot process descriptor data with no path given." + ) + if ( + descriptor_input_type + not in descriptor_input_types_descriptor_scoring + ): + raise Exception("Cannot process this type of descriptor data.") + else: + raise Exception( + "Cannot calculate scoring metrics without descriptor data." + ) + + if target_input_type is not None: + if target_input_path is None: + raise Exception( + "Cannot process target data with no path given." + ) + if target_input_type not in target_input_types_descriptor_scoring: + raise Exception("Cannot process this type of target data.") + else: + raise Exception( + "Cannot calculate scoring metrics without target data." + ) + + # Assign info. + self._snapshots.append( + {"input": descriptor_input_path, "output": target_input_path} + ) + self._snapshot_description.append( + {"input": descriptor_input_type, "output": target_input_type} + ) + self._snapshot_units.append( + {"input": descriptor_units, "output": target_units} + ) + + def add_hyperparameter(self, name, choices): + """ + Add a hyperparameter to the current investigation. + + Parameters + ---------- + name : string + Name of the hyperparameter. Please note that these names always + have to be the same as the parameter names in + ParametersDescriptors. + + choices : + List of possible choices. + """ + if name not in [ + "bispectrum_twojmax", + "bispectrum_cutoff", + "atomic_density_sigma", + "atomic_density_cutoff", + ]: + raise Exception( + "Unkown hyperparameter for training free descriptor" + "hyperparameter optimization entered." + ) + + self.params.hyperparameters.hlist.append( + Hyperparameter( + hotype="descriptor_scoring", + name=name, + choices=choices, + opttype="categorical", + ) + ) + + def perform_study( + self, file_based_communication=False, return_plotting=False + ): + """ + Perform the study, i.e. the optimization. + + This is done by sampling different descriptors, calculated with + different hyperparameters and then calculating some surrogate score. + """ + # Prepare the hyperparameter lists. + self._construct_hyperparam_list() + hyperparameter_tuples = list( + itertools.product(*self._internal_hyperparam_list) + ) + + # Perform the descriptor scoring analysis separately for each snapshot. + for i in range(0, len(self._snapshots)): + self.best_trial_index = None + self.best_score = None + printout( + "Starting descriptor scoring analysis of snapshot", + str(i), + min_verbosity=1, + ) + current_list = [] + target = self._load_target( + self._snapshots[i], + self._snapshot_description[i], + self._snapshot_units[i], + file_based_communication, + ) + + for idx, hyperparameter_tuple in enumerate(hyperparameter_tuples): + if isinstance(self._descriptor_calculator, Bispectrum): + self.params.descriptors.bispectrum_cutoff = ( + hyperparameter_tuple[0] + ) + self.params.descriptors.bispectrum_twojmax = ( + hyperparameter_tuple[1] + ) + elif isinstance(self._descriptor_calculator, AtomicDensity): + self.params.descriptors.atomic_density_cutoff = ( + hyperparameter_tuple[0] + ) + self.params.descriptors.atomic_density_sigma = ( + hyperparameter_tuple[1] + ) + + descriptor = self._calculate_descriptors( + self._snapshots[i], + self._snapshot_description[i], + self._snapshot_units[i], + ) + if get_rank() == 0: + score = self._calculate_score( + descriptor, + target, + ) + if not np.isnan(score): + self._update_logging(score, idx) + current_list.append( + list(hyperparameter_tuple) + [score] + ) + else: + current_list.append( + list(hyperparameter_tuple) + [np.inf] + ) + + outstring = "[" + for label_id, label in enumerate(self._labels): + outstring += ( + label + ": " + str(hyperparameter_tuple[label_id]) + ) + if label_id < len(self._labels) - 1: + outstring += ", " + outstring += "]" + best_trial_string = ". No suitable trial found yet." + if self.best_score is not None: + best_trial_string = ( + ". Best trial is " + + str(self.best_trial_index) + + " with " + + str(self.best_score) + ) + + printout( + "Trial", + idx, + "finished with score=" + str(score), + "and parameters:", + outstring + best_trial_string, + min_verbosity=1, + ) + + if get_rank() == 0: + self._study.append(current_list) + + if get_rank() == 0: + self._study = np.mean(self._study, axis=0) + + if return_plotting: + results_to_plot = [] + if len(self._internal_hyperparam_list) == 2: + len_first_dim = len(self._internal_hyperparam_list[0]) + len_second_dim = len(self._internal_hyperparam_list[1]) + for i in range(0, len_first_dim): + results_to_plot.append( + self._study[ + i * len_second_dim : (i + 1) * len_second_dim, + 2:, + ] + ) + + if isinstance(self._descriptor_calculator, Bispectrum): + return results_to_plot, { + "twojmax": self._internal_hyperparam_list[1], + "cutoff": self._internal_hyperparam_list[0], + } + if isinstance(self._descriptor_calculator, AtomicDensity): + return results_to_plot, { + "sigma": self._internal_hyperparam_list[1], + "cutoff": self._internal_hyperparam_list[0], + } + + def set_optimal_parameters(self): + """ + Set optimal parameters. + + This function will write the determined hyperparameters directly to + MALA parameters object referenced in this class. + """ + if get_rank() == 0: + best_trial = self._get_best_trial() + minimum_score = self._study[np.argmin(self._study[:, -1])] + if isinstance(self._descriptor_calculator, Bispectrum): + self.params.descriptors.bispectrum_cutoff = best_trial[0] + self.params.descriptors.bispectrum_twojmax = int(best_trial[1]) + printout( + "Descriptor scoring analysis finished, optimal parameters: ", + ) + printout( + "Bispectrum twojmax: ", + self.params.descriptors.bispectrum_twojmax, + ) + printout( + "Bispectrum cutoff: ", + self.params.descriptors.bispectrum_cutoff, + ) + if isinstance(self._descriptor_calculator, AtomicDensity): + self.params.descriptors.atomic_density_cutoff = best_trial[0] + self.params.descriptors.atomic_density_sigma = best_trial[1] + printout( + "Descriptor scoring analysis finished, optimal parameters: ", + ) + printout( + "Atomic density sigma: ", + self.params.descriptors.atomic_density_sigma, + ) + printout( + "Atomic density cutoff: ", + self.params.descriptors.atomic_density_cutoff, + ) + + @abstractmethod + def _get_best_trial(self): + """Determine the best trial as given by this study.""" + pass + + def _construct_hyperparam_list(self): + if isinstance(self._descriptor_calculator, Bispectrum): + if ( + list( + map( + lambda p: "bispectrum_cutoff" in p.name, + self.params.hyperparameters.hlist, + ) + ).count(True) + == 0 + ): + first_dim_list = [self.params.descriptors.bispectrum_cutoff] + else: + first_dim_list = self.params.hyperparameters.hlist[ + list( + map( + lambda p: "bispectrum_cutoff" in p.name, + self.params.hyperparameters.hlist, + ) + ).index(True) + ].choices + + if ( + list( + map( + lambda p: "bispectrum_twojmax" in p.name, + self.params.hyperparameters.hlist, + ) + ).count(True) + == 0 + ): + second_dim_list = [self.params.descriptors.bispectrum_twojmax] + else: + second_dim_list = self.params.hyperparameters.hlist[ + list( + map( + lambda p: "bispectrum_twojmax" in p.name, + self.params.hyperparameters.hlist, + ) + ).index(True) + ].choices + + self._internal_hyperparam_list = [first_dim_list, second_dim_list] + self._labels = ["cutoff", "twojmax"] + + elif isinstance(self._descriptor_calculator, AtomicDensity): + if ( + list( + map( + lambda p: "atomic_density_cutoff" in p.name, + self.params.hyperparameters.hlist, + ) + ).count(True) + == 0 + ): + first_dim_list = [ + self.params.descriptors.atomic_density_cutoff + ] + else: + first_dim_list = self.params.hyperparameters.hlist[ + list( + map( + lambda p: "atomic_density_cutoff" in p.name, + self.params.hyperparameters.hlist, + ) + ).index(True) + ].choices + + if ( + list( + map( + lambda p: "atomic_density_sigma" in p.name, + self.params.hyperparameters.hlist, + ) + ).count(True) + == 0 + ): + second_dim_list = [ + self.params.descriptors.atomic_density_sigma + ] + else: + second_dim_list = self.params.hyperparameters.hlist[ + list( + map( + lambda p: "atomic_density_sigma" in p.name, + self.params.hyperparameters.hlist, + ) + ).index(True) + ].choices + self._internal_hyperparam_list = [first_dim_list, second_dim_list] + self._labels = ["cutoff", "sigma"] + + else: + raise Exception( + "Unkown descriptor calculator selected. Cannot " + "perform descriptor scoring optimization." + ) + + def _calculate_descriptors(self, snapshot, description, original_units): + descriptor_calculation_kwargs = {} + tmp_input = None + if description["input"] == "espresso-out": + descriptor_calculation_kwargs["units"] = original_units["input"] + tmp_input, local_size = ( + self._descriptor_calculator.calculate_from_qe_out( + snapshot["input"], **descriptor_calculation_kwargs + ) + ) + + elif description["input"] is None: + # In this case, only the output is processed. + pass + + else: + raise Exception( + "Unknown file extension, cannot convert descriptor" + ) + if self.params.descriptors._configuration["mpi"]: + tmp_input = self._descriptor_calculator.gather_descriptors( + tmp_input + ) + + return tmp_input + + def _load_target( + self, snapshot, description, original_units, file_based_communication + ): + memmap = None + if ( + self.params.descriptors._configuration["mpi"] + and file_based_communication + ): + memmap = "descriptor_scoring.out.npy_temp" + + target_calculator_kwargs = {} + + # Read the output data + tmp_output = None + if description["output"] == ".cube": + target_calculator_kwargs["units"] = original_units["output"] + target_calculator_kwargs["use_memmap"] = memmap + # If no units are provided we just assume standard units. + tmp_output = self._target_calculator.read_from_cube( + snapshot["output"], **target_calculator_kwargs + ) + + elif description["output"] == ".xsf": + target_calculator_kwargs["units"] = original_units["output"] + target_calculator_kwargs["use_memmap"] = memmap + # If no units are provided we just assume standard units. + tmp_output = self._target_calculator.read_from_xsf( + snapshot["output"], **target_calculator_kwargs + ) + + elif description["output"] == "numpy": + if get_rank() == 0: + tmp_output = self._target_calculator.read_from_numpy_file( + snapshot["output"], units=original_units["output"] + ) + + elif description["output"] == "openpmd": + if get_rank() == 0: + tmp_output = self._target_calculator.read_from_numpy_file( + snapshot["output"], units=original_units["output"] + ) + else: + raise Exception("Unknown file extension, cannot convert target") + + if get_rank() == 0: + if ( + self.params.targets._configuration["mpi"] + and file_based_communication + ): + os.remove(memmap) + + return tmp_output + + @abstractmethod + def _update_logging(self, score, index): + pass + + @abstractmethod + def _calculate_score(self, descriptor, target): + pass diff --git a/mala/network/hyperparameter.py b/mala/network/hyperparameter.py index 4294d6a4f..17f0111ab 100644 --- a/mala/network/hyperparameter.py +++ b/mala/network/hyperparameter.py @@ -158,12 +158,12 @@ def __new__( hparam = HyperparameterOAT( hotype=hotype, opttype=opttype, name=name, choices=choices ) - if hotype == "acsd": - from mala.network.hyperparameter_acsd import ( - HyperparameterACSD, + if hotype == "descriptor_scoring": + from mala.network.hyperparameter_descriptor_scoring import ( + HyperparameterDescriptorScoring, ) - hparam = HyperparameterACSD( + hparam = HyperparameterDescriptorScoring( hotype=hotype, opttype=opttype, name=name, diff --git a/mala/network/hyperparameter_acsd.py b/mala/network/hyperparameter_descriptor_scoring.py similarity index 92% rename from mala/network/hyperparameter_acsd.py rename to mala/network/hyperparameter_descriptor_scoring.py index 6ecee0e76..34428f1d6 100644 --- a/mala/network/hyperparameter_acsd.py +++ b/mala/network/hyperparameter_descriptor_scoring.py @@ -3,7 +3,7 @@ from mala.network.hyperparameter import Hyperparameter -class HyperparameterACSD(Hyperparameter): +class HyperparameterDescriptorScoring(Hyperparameter): """Represents an optuna parameter. Parameters @@ -44,7 +44,7 @@ def __init__( high=0, choices=None, ): - super(HyperparameterACSD, self).__init__( + super(HyperparameterDescriptorScoring, self).__init__( opttype=opttype, name=name, low=low, high=high, choices=choices ) diff --git a/mala/network/mutual_information_analyzer.py b/mala/network/mutual_information_analyzer.py new file mode 100644 index 000000000..f563bd648 --- /dev/null +++ b/mala/network/mutual_information_analyzer.py @@ -0,0 +1,213 @@ +"""Class for performing a full mutual information analysis.""" + +import numpy as np + + +from mala.common.parallelizer import parallel_warn +from mala.network.descriptor_scoring_optimizer import ( + DescriptorScoringOptimizer, +) + + +class MutualInformationAnalyzer(DescriptorScoringOptimizer): + """ + Analyzer based on mutual information analysis. + + Parameters + ---------- + params : mala.common.parametes.Parameters + Parameters used to create this hyperparameter optimizer. + + descriptor_calculator : mala.descriptors.descriptor.Descriptor + The descriptor calculator used for parsing/converting fingerprint + data. If None, the descriptor calculator will be created by this + object using the parameters provided. Default: None + + target_calculator : mala.targets.target.Target + Target calculator used for parsing/converting target data. If None, + the target calculator will be created by this object using the + parameters provided. Default: None + """ + + def __init__( + self, params, target_calculator=None, descriptor_calculator=None + ): + parallel_warn( + "The MutualInformationAnalyzer is still in its " + "experimental stage. The API is consistent with " + "MALA hyperparameter optimization and will likely not " + "change, but the internal algorithm may be subject " + "to changes in the near-future." + ) + super(MutualInformationAnalyzer, self).__init__( + params, + target_calculator=target_calculator, + descriptor_calculator=descriptor_calculator, + ) + + def _get_best_trial(self): + """Determine the best trial as given by this study.""" + return self._study[np.argmax(self._study[:, -1])] + + def _update_logging(self, score, index): + if self.best_score is None: + self.best_score = score + self.best_trial_index = index + elif score > self.best_score: + self.best_score = score + self.best_trial_index = index + + def _calculate_score(self, descriptor, target): + return self._calculate_mutual_information( + descriptor, + target, + self.params.hyperparameters.mutual_information_points, + descriptor_vectors_contain_xyz=self.params.descriptors.descriptors_contain_xyz, + ) + + @staticmethod + def _calculate_mutual_information( + descriptor_data, + ldos_data, + n_samples, + descriptor_vectors_contain_xyz=True, + ): + """ + Calculate the Mutual Information for given descriptor and LDOS data. + + Mutual Information measures how well the descriptors capture the + relevant information that is needed to predict the LDOS. + The unit of MI is bits. + + Parameters + ---------- + descriptor_data : numpy.ndarray + Array containing the descriptors. + + ldos_data : numpy.ndarray + Array containing the LDOS. + + n_samples : int + The number of points for which to calculate the mutual information. + + Returns + ------- + mi : float + The mutual information between the descriptor and the LDOS in bits. + + """ + descriptor_dim = np.shape(descriptor_data) + ldos_dim = np.shape(ldos_data) + if len(descriptor_dim) == 4: + descriptor_data = np.reshape( + descriptor_data, + ( + descriptor_dim[0] * descriptor_dim[1] * descriptor_dim[2], + descriptor_dim[3], + ), + ) + if descriptor_vectors_contain_xyz: + descriptor_data = descriptor_data[:, 3:] + elif len(descriptor_dim) != 2: + raise Exception("Cannot work with this descriptor data.") + + if len(ldos_dim) == 4: + ldos_data = np.reshape( + ldos_data, + (ldos_dim[0] * ldos_dim[1] * ldos_dim[2], ldos_dim[3]), + ) + elif len(ldos_dim) != 2: + raise Exception("Cannot work with this LDOS data.") + + # The hyperparameters could be put potentially into the params. + mi = MutualInformationAnalyzer._mutual_information( + descriptor_data, + ldos_data, + n_components=None, + n_samples=n_samples, + covariance_type="diag", + normalize_data=True, + ) + return mi + + @staticmethod + def _normalize(data): + mean = np.mean(data, axis=0) + std = np.std(data, axis=0) + std_nonzero = std > 1e-6 + data = data[:, std_nonzero] + mean = mean[std_nonzero] + std = std[std_nonzero] + data = (data - mean) / std + return data + + @staticmethod + def _mutual_information( + X, + Y, + n_components=None, + max_iter=1000, + n_samples=100000, + covariance_type="diag", + normalize_data=False, + ): + import sklearn.mixture + import sklearn.covariance + + assert ( + covariance_type == "diag" + ), "Only support covariance_type='diag' for now" + n = X.shape[0] + dim_X = X.shape[-1] + rand_subset = np.random.permutation(n)[:n_samples] + if normalize_data: + X = MutualInformationAnalyzer._normalize(X) + Y = MutualInformationAnalyzer._normalize(Y) + X = X[rand_subset] + Y = Y[rand_subset] + XY = np.concatenate([X, Y], axis=1) + d = XY.shape[-1] + if n_components is None: + n_components = d // 2 + gmm_XY = sklearn.mixture.GaussianMixture( + n_components=n_components, + covariance_type=covariance_type, + max_iter=max_iter, + ) + gmm_XY.fit(XY) + + gmm_X = sklearn.mixture.GaussianMixture( + n_components=n_components, + covariance_type=covariance_type, + max_iter=max_iter, + ) + gmm_X.weights_ = gmm_XY.weights_ + gmm_X.means_ = gmm_XY.means_[:, :dim_X] + gmm_X.covariances_ = gmm_XY.covariances_[:, :dim_X] + gmm_X.precisions_ = gmm_XY.precisions_[:, :dim_X] + gmm_X.precisions_cholesky_ = gmm_XY.precisions_cholesky_[:, :dim_X] + + gmm_Y = sklearn.mixture.GaussianMixture( + n_components=n_components, + covariance_type=covariance_type, + max_iter=max_iter, + ) + gmm_Y.weights_ = gmm_XY.weights_ + gmm_Y.means_ = gmm_XY.means_[:, dim_X:] + gmm_Y.covariances_ = gmm_XY.covariances_[:, dim_X:] + gmm_Y.precisions_ = gmm_XY.precisions_[:, dim_X:] + gmm_Y.precisions_cholesky_ = gmm_XY.precisions_cholesky_[:, dim_X:] + + rand_perm = np.random.permutation(Y.shape[0]) + Y_perm = Y[rand_perm] + XY_perm = np.concatenate([X, Y_perm], axis=1) + temp = ( + gmm_XY.score_samples(XY_perm) + - gmm_X.score_samples(X) + - gmm_Y.score_samples(Y_perm) + ) + temp_exp = np.exp(temp) + mi = np.mean(temp_exp * temp) + # change log base e to log base 2 + mi = mi / np.log(2) + return mi diff --git a/setup.py b/setup.py index b34c3fef2..7ce1509ff 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ extras = { "dev": ["bump2version"], - "opt": ["oapackage"], + "opt": ["oapackage", "scikit-learn"], "test": ["pytest", "pytest-cov"], "doc": open("docs/requirements.txt").read().splitlines(), "experimental": ["asap3", "dftpy", "minterpy"], diff --git a/test/hyperopt_test.py b/test/hyperopt_test.py index 1cd8cd2c3..51fb5d199 100644 --- a/test/hyperopt_test.py +++ b/test/hyperopt_test.py @@ -12,7 +12,7 @@ # Control how much the loss should be better after hyperopt compared to # before. This value is fairly high, but we're training on absolutely # minimal amounts of data. -desired_loss_improvement_factor = 2 +desired_loss_improvement_factor = 1.5 # Different HO methods will lead to different results, but they should be # approximately the same.