diff --git a/tests/test_simulator.py b/tests/test_simulator.py index 76e8bf3..3742573 100644 --- a/tests/test_simulator.py +++ b/tests/test_simulator.py @@ -32,3 +32,21 @@ def test_simulator_base(): s.fit() MockDriver.fit.assert_called_once_with(s._data.X, s._data.y) + + +def test_pdm_simulator(): + n_samples = 10 + data = tsgm.dataset.DatasetProperties(N=100, T=12, D=23) + pdm_simulator = tsgm.simulator.PredictiveMaintenanceSimulator(data) + syn_dataset, equipment = pdm_simulator.generate(n_samples) + assert len(equipment) == 10 + assert len(syn_dataset) == 10 + for d in equipment: + assert isinstance(d, dict) + + new_sim = pdm_simulator.clone() + params1 = pdm_simulator.params() + params2 = new_sim.params() + assert params1["switches"] == params2["switches"] + assert params1["m_norms"] == params2["m_norms"] + assert params1["sigma_norms"] == params2["sigma_norms"] diff --git a/tests/test_zoo.py b/tests/test_zoo.py index 1f8e543..5a83cda 100644 --- a/tests/test_zoo.py +++ b/tests/test_zoo.py @@ -1,9 +1,6 @@ import pytest -import functools import numpy as np -import random -import tensorflow as tf from tensorflow.keras import layers import sklearn.metrics.pairwise @@ -48,7 +45,8 @@ def test_zoo_cgan(model_type): @pytest.mark.parametrize("model_type_name", [ "clf_cn", "clf_cl_n", - "clf_block"], + "clf_block", + "clf_transformer",] ) def test_zoo_clf(model_type_name): seq_len = 10 diff --git a/tsgm/metrics/metrics.py b/tsgm/metrics/metrics.py index 7efadde..afd3776 100644 --- a/tsgm/metrics/metrics.py +++ b/tsgm/metrics/metrics.py @@ -306,10 +306,6 @@ class EntropyMetric(Metric): """ Calculates the spectral entropy of a dataset or tensor. - This metric measures the randomness or disorder in a dataset or tensor - using spectral entropy, which is a measure of the distribution of energy - in the frequency domain. - Args: d (tsgm.dataset.DatasetOrTensor): The input dataset or tensor. diff --git a/tsgm/models/architectures/zoo.py b/tsgm/models/architectures/zoo.py index 324829a..2253c8b 100644 --- a/tsgm/models/architectures/zoo.py +++ b/tsgm/models/architectures/zoo.py @@ -653,7 +653,7 @@ def _build_model(self) -> keras.Model: class BasicRecurrentArchitecture(Architecture): """ - Base class for basic recurrent neural network architectures. + Base class for recurrent neural network architectures. Inherits from Architecture. """ @@ -726,6 +726,76 @@ def build(self, activation: str = "sigmoid", return_sequences: bool = True) -> k return model +class TransformerClfArchitecture(BaseClassificationArchitecture): + """ + Base class for transformer architectures. + + Inherits from BaseClassificationArchitecture. + """ + + arch_type = "downstream:classification" + + def __init__(self, seq_len: int, feat_dim: int, num_heads: int = 2, ff_dim: int = 64, n_blocks: int = 1, dropout_rate=0.5, output_dim: int = 2) -> None: + """ + Initializes the TransformerClfArchitecture. + + :param seq_len: Length of input sequences. + :type seq_len: int + :param feat_dim: Dimensionality of input features. + :type feat_dim: int + :param num_heads: Number of attention heads (default is 2). + :type num_heads: int + :param ff_dim: Feed forward dimension in the attention block (default is 64). + :type ff_dim: int + :param output_dim: Dimensionality of the output. + :type output_dim: int + :param dropout_rate: Dropout probability (default is 0.5). + :type dropout_rate: float, optional + :param n_blocks: Number of transformer blocks (default is 1). + :type n_blocks: int, optional + :param output_dim: Number of classes (default is 2). + :type output_dim: int, optional + """ + + self._num_heads = num_heads + self._ff_dim = ff_dim + self._n_blocks = n_blocks + self._dropout_rate = dropout_rate + + super().__init__(seq_len, feat_dim, output_dim) + + def transformer_block(self, inputs): + # Multi-Head Attention + attention_output = layers.MultiHeadAttention( + num_heads=self._num_heads, + key_dim=inputs.shape[-1] + )(inputs, inputs) + attention_output = layers.Dropout(self._dropout_rate)(attention_output) + attention_output = layers.LayerNormalization(epsilon=1e-6)(attention_output + inputs) + + # Feed-Forward Network + ff_output = layers.Dense(self._ff_dim, activation="relu")(attention_output) + ff_output = layers.Dense(inputs.shape[-1])(ff_output) + ff_output = layers.Dropout(self._dropout_rate)(ff_output) + ff_output = layers.LayerNormalization(epsilon=1e-6)(ff_output + attention_output) + + return ff_output + + def _build_model(self) -> keras.Model: + inputs = layers.Input(shape=(self._seq_len, self._feat_dim)) + + x = inputs + for _ in range(self._n_blocks): + x = self.transformer_block(x) + + x = layers.GlobalAveragePooling1D()(x) + x = layers.Dropout(self._dropout_rate)(x) + outputs = layers.Dense(self._output_dim, activation="softmax")(x) + + model = keras.Model(inputs, outputs) + return model + + class cGAN_LSTMnArchitecture(BaseGANArchitecture): """ Conditional Generative Adversarial Network (cGAN) with LSTM-based architecture. @@ -837,5 +907,6 @@ def summary(self) -> None: "clf_cl_n": ConvnLSTMnArchitecture, "clf_block": BlockClfArchitecture, "recurrent": BasicRecurrentArchitecture, + "clf_transformer": TransformerClfArchitecture } ) diff --git a/tsgm/simulator.py b/tsgm/simulator.py index 2f1189b..ff13f8c 100644 --- a/tsgm/simulator.py +++ b/tsgm/simulator.py @@ -1,5 +1,7 @@ import abc import copy +import sklearn +from tqdm import tqdm import typing as T import numpy as np import tensorflow_probability as tfp @@ -9,51 +11,216 @@ class BaseSimulator(abc.ABC): + """ + Abstract base class for simulators. This class defines the interface for simulators. + + Methods + ------- + generate(num_samples: int, *args) -> tsgm.dataset.Dataset + Generate a dataset with the specified number of samples. + + dump(path: str, format: str = "csv") -> None + Save the generated dataset to a file in the specified format. + """ @abc.abstractmethod def generate(self, num_samples: int, *args) -> tsgm.dataset.Dataset: + """ + Abstract method to generate a dataset. + + Parameters + ---------- + num_samples : int + Number of samples to generate. + *args + Additional arguments to be passed to the method. + + Returns + ------- + tsgm.dataset.Dataset + The generated dataset. + """ pass @abc.abstractmethod def dump(self, path: str, format: str = "csv") -> None: + """ + Abstract method to save the generated dataset to a file. + + Parameters + ---------- + path : str + The file path where the dataset will be saved. + format : str, optional + The format in which to save the dataset, by default "csv". + """ pass class Simulator(BaseSimulator): + """ + Concrete class for a basic simulator. This class implements the basic methods for fitting a model and + generating a dataset, but does not implement the generation and dump methods. + + Attributes + ---------- + _data : tsgm.dataset.DatasetProperties + Properties of the dataset to be used by the simulator. + _driver : Optional[tsgm.types.Model] + The model to be used for generating data. + """ def __init__(self, data: tsgm.dataset.DatasetProperties, driver: T.Optional[tsgm.types.Model] = None): + """ + Initialize the Simulator with dataset properties and an optional model. + + Parameters + ---------- + data : tsgm.dataset.DatasetProperties + Properties of the dataset to be used. + driver : Optional[tsgm.types.Model], optional + The model to be used for generating data, by default None. + """ self._data = data self._driver = driver def fit(self, **kwargs) -> None: + """ + Fit the model using the dataset properties. + + Parameters + ---------- + **kwargs + Additional keyword arguments to pass to the model's fit method. + """ if self._data.y is not None: self._driver.fit(self._data.X, self._data.y, **kwargs) else: self._driver.fit(self._data.X, **kwargs) def generate(self, num_samples: int, *args) -> TensorLike: + """ + Method to generate a dataset. Not implemented in this class. + + Parameters + ---------- + num_samples : int + Number of samples to generate. + *args + Additional arguments to be passed to the method. + + Returns + ------- + TensorLike + The generated dataset. + + Raises + ------ + NotImplementedError + This method is not implemented in this class. + """ raise NotImplementedError def dump(self, path: str, format: str = "csv") -> None: + """ + Method to save the generated dataset to a file. Not implemented in this class. + + Parameters + ---------- + path : str + The file path where the dataset will be saved. + format : str, optional + The format in which to save the dataset, by default "csv". + + Raises + ------ + NotImplementedError + This method is not implemented in this class. + """ raise NotImplementedError def clone(self) -> "Simulator": + """ + Create a deep copy of the simulator. + + Returns + ------- + Simulator + A deep copy of the current simulator instance. + """ return Simulator(copy.deepcopy(self._data)) class ModelBasedSimulator(Simulator): + """ + A simulator that is based on a model. This class extends the Simulator class and provides additional + methods for handling model parameters. + + Methods + ------- + params() -> T.Dict[str, T.Any] + Get a dictionary of the simulator's parameters. + + set_params(params: T.Dict[str, T.Any]) -> None + Set the simulator's parameters from a dictionary. + + generate(num_samples: int, *args) -> None + Generate a dataset with the specified number of samples. + """ def __init__(self, data: tsgm.dataset.DatasetProperties): + """ + Initialize the ModelBasedSimulator with dataset properties. + + Parameters + ---------- + data : tsgm.dataset.DatasetProperties + Properties of the dataset to be used. + """ super().__init__(data) def params(self) -> T.Dict[str, T.Any]: + """ + Get a dictionary of the simulator's parameters. + + Returns + ------- + dict + A dictionary containing the simulator's parameters. + """ params = copy.deepcopy(self.__dict__) - del params["_data"], params["_driver"] + if "_data" in params: + del params["_data"] + if "_driver" in params: + del params["_driver"] return params def set_params(self, params: T.Dict[str, T.Any]) -> None: + """ + Set the simulator's parameters from a dictionary. + + Parameters + ---------- + params : dict + A dictionary containing the parameters to set. + """ for param_name, param_value in params.items(): self.__dict__[param_name] = param_value @abc.abstractmethod def generate(self, num_samples: int, *args) -> None: + """ + Abstract method to generate a dataset. Must be implemented by subclasses. + + Parameters + ---------- + num_samples : int + Number of samples to generate. + *args + Additional arguments to be passed to the method. + + Raises + ------ + NotImplementedError + This method is not implemented in this class and must be overridden by subclasses. + """ raise NotImplementedError @@ -63,12 +230,40 @@ def clone(self) -> "NNSimulator": class SineConstSimulator(ModelBasedSimulator): + """ + Sine and Constant Function Simulator class that extends the ModelBasedSimulator base class. + + Attributes: + _scale: TensorFlow probability distribution for scaling factor. + _const: TensorFlow probability distribution for constant. + _shift: TensorFlow probability distribution for shift. + + Methods: + __init__(data, max_scale=10.0, max_const=5.0): Initializes the simulator with dataset properties and optional parameters. + set_params(max_scale, max_const, *args, **kwargs): Sets the parameters for scale, constant, and shift distributions. + generate(num_samples, *args) -> tsgm.dataset.Dataset: Generates a dataset based on sine and constant functions. + clone() -> SineConstSimulator: Creates and returns a deep copy of the current simulator. + """ def __init__(self, data: tsgm.dataset.DatasetProperties, max_scale: float = 10.0, max_const: float = 5.0) -> None: + """ + Initializes the SineConstSimulator with dataset properties and optional maximum scale and constant values. + Args: + data (tsgm.dataset.DatasetProperties): Dataset properties for the simulator. + max_scale (float, optional): Maximum value for the scale parameter. Defaults to 10.0. + max_const (float, optional): Maximum value for the constant parameter. Defaults to 5.0. + """ super().__init__(data) self.set_params(max_scale, max_const) def set_params(self, max_scale: float, max_const: float, *args, **kwargs): + """ + Sets the parameters for scale, constant, and shift distributions. + + Args: + max_scale (float): Maximum value for the scale parameter. + max_const (float): Maximum value for the constant parameter. + """ self._scale = tfp.distributions.Uniform(0, max_scale) self._const = tfp.distributions.Uniform(0, max_const) self._shift = tfp.distributions.Uniform(0, 2) @@ -76,6 +271,15 @@ def set_params(self, max_scale: float, max_const: float, *args, **kwargs): super().set_params({"max_scale": max_scale, "max_const": max_const}) def generate(self, num_samples: int, *args) -> tsgm.dataset.Dataset: + """ + Generates a dataset based on sine and constant functions. + + Args: + num_samples (int): Number of samples to generate. + + Returns: + tsgm.dataset.Dataset: A dataset containing generated samples. + """ result_X, result_y = [], [] for i in range(num_samples): scales = self._scale.sample(self._data.D) @@ -91,7 +295,203 @@ def generate(self, num_samples: int, *args) -> tsgm.dataset.Dataset: return tsgm.dataset.Dataset(x=np.array(result_X), y=np.array(result_y)) def clone(self) -> "SineConstSimulator": + """ + Creates a deep copy of the current SineConstSimulator instance. + + Returns: + SineConstSimulator: A new instance of SineConstSimulator with copied data and parameters. + """ copy_simulator = SineConstSimulator(self._data) params = self.params() copy_simulator.set_params(max_scale=params["max_scale"], max_const=params["max_const"]) return copy_simulator + + +class PredictiveMaintenanceSimulator(ModelBasedSimulator): + """ + Predictive Maintenance Simulator class that extends the ModelBasedSimulator base class. + The simulator is based on https://github.com/AaltoPML/human-in-the-loop-predictive-maintenance + From publication: + Nikitin, Alexander, and Samuel Kaski. "Human-in-the-loop large-scale predictive maintenance of + workstations." Proceedings of the 28th ACM SIGKDD Conference on Knowledge Discovery and Data Mining. 2022. + + Attributes: + CAT_FEATURES (list): List of categorical feature indices. + encoders (dict): Dictionary of OneHotEncoders for categorical features. + Methods: + __init__(data): Initializes the simulator with dataset properties and sets encoders. + S(lmbd, t): Calculates the survival curve. + R(rho, lmbd, t): Calculates the recovery curve parameter. + set_params(**kwargs): Sets the parameters for the simulator. + mixture_function(a, x): Calculates the mixture function. + sample_equipment(num_samples): Samples equipment data and generates the dataset. + generate(num_samples): Generates the predictive maintenance dataset. + clone() -> PredictiveMaintenanceSimulator: Creates and returns a deep copy of the current simulator. + """ + + # categorical features + CAT_FEATURES = [0, 1, 2, 3, 4, 5, 6, 7] + + def __init__(self, data: tsgm.dataset.DatasetProperties) -> None: + """ + Initializes the PredictiveMaintenanceSimulator with dataset properties and sets encoders for categorical features. + + Args: + data (tsgm.dataset.DatasetProperties): Dataset properties for the simulator. + """ + self._data = data + self.encoders = {d: sklearn.preprocessing.OneHotEncoder() for d in self.CAT_FEATURES} + + for d in self.CAT_FEATURES: + self.encoders[d].fit([[d], [d + 2], [d + 4], [d + 1], [d + 3], [d + 5], [d + 7]]) + self.set_params() + + def S(self, lmbd, t): + """ + Calculates the survival curve. + + Args: + lmbd: Lambda parameter for the exponential distribution. + t: Time variable. + + Returns: + float: Survival probability at time t. + """ + return np.exp(-lmbd * t) + + def R(self, rho, lmbd, t): + """ + Calculates the recovery curve parameter. + + Args: + rho: Rho parameter for the recovery function. + lmbd: Lambda parameter for the exponential distribution. + t: Time variable. + + Returns: + float: Recovery curve parameter at time t. + """ + s_ = self.S(lmbd, t) + return (1 - s_) - rho + + def set_params(self, **kwargs): + """ + Sets the parameters for the simulator. + + Args: + **kwargs: Arbitrary keyword arguments for setting simulator parameters. + """ + if "switches" in kwargs: + self._switches = kwargs["switches"] + else: + self._switches = {d: np.random.gamma(4, 2) for d in range(self._data.D)} + + if "m_norms" in kwargs: + self._m_norms = kwargs["m_norms"] + else: + self._m_norms = {d: lambda: np.random.gamma(2, 1) for d in range(self._data.D)} + + if "sigma_norms" in kwargs: + self._sigma_norms = kwargs["sigma_norms"] + else: + self._sigma_norms = {d: lambda: np.random.gamma(1, 1) for d in range(self._data.D)} + + super().set_params({ + "switches": self._switches, + "m_norms": self._m_norms, + "sigma_norms": self._sigma_norms + }) + + def mixture_function(self, a, x): + """ + Calculates the mixture function. + + Args: + a: Mixture parameter. + x: Input variable. + + Returns: + float: Mixture function value. + """ + return (a**x - 1) / (a - 1) + + def sample_equipment(self, num_samples): + """ + Samples equipment data and generates the dataset. + + Args: + num_samples (int): Number of samples to generate. + + Returns: + tuple: A tuple containing the dataset and equipment information. + """ + equipment, dataset = [], [] + for _ in tqdm(range(num_samples)): + last_norm_tmp = 0 + lmbd = np.random.gamma(1, 0.005) + rho = np.random.gamma(1, 0.1) + equipment.append({ + "lambda": lmbd, + "rho": rho + }) + current_measurements = [] + ss = [] + fix_tmps = [] + rnd = np.random.uniform(0, 1) + for t in range(self._data.T): + measurements = [] + + s_ = self.S(lmbd, t - last_norm_tmp) + r_ = self.R(rho, lmbd, t - last_norm_tmp) + ss.append(s_) + + if rnd < r_: + rnd = np.random.uniform(0, 1) + last_norm_tmp = t + fix_tmps.append(t) + + for d in range(self._data.D): + m_norm = self._m_norms[d]() + sigma_norm = self._sigma_norms[d]() + + m_abnorm = m_norm + self._switches[d] + sigma_abnorm = 1.5 * sigma_norm + + if d in self.CAT_FEATURES: + norm_functioning = np.random.choice([d, d + 2, d + 4], p=[0.7, 0.2, 0.1]) + abnorm_functioning = np.random.choice([d + 1, d + 3, d + 5, d + 7], p=[0.2, 0.2, 0.4, 0.2]) + else: + norm_functioning = np.random.normal(m_norm, sigma_norm) + abnorm_functioning = np.random.normal(m_abnorm, sigma_abnorm) + + mixt = self.mixture_function(3, s_) + if d in self.CAT_FEATURES: + if rnd < 1 - s_: + measurements.extend(self.encoders[d].transform([[abnorm_functioning]]).toarray()[0]) + else: + measurements.extend(self.encoders[d].transform([[norm_functioning]]).toarray()[0]) + else: + measurements.extend([mixt * norm_functioning + (1 - mixt) * abnorm_functioning]) + + if not len(current_measurements): + current_measurements.append([measurements]) + current_measurements = np.array(current_measurements[0]) + else: + current_measurements = np.concatenate((current_measurements, np.array(measurements)[np.newaxis, :]), axis=0) + equipment[-1]["fixes"] = fix_tmps + equipment[-1]["ss"] = ss + dataset.append(current_measurements) + dataset = np.transpose(np.array(dataset), [0, 2, 1]) + return dataset, equipment + + def generate(self, num_samples: int): + return self.sample_equipment(num_samples) + + def clone(self) -> "PredictiveMaintenanceSimulator": + copy_simulator = PredictiveMaintenanceSimulator(self._data) + params = self.params() + copy_simulator.set_params( + switches=params["switches"], + m_norms=params["m_norms"], + sigma_norms=params["sigma_norms"]) + return copy_simulator