diff --git a/tests/test_timegan.py b/tests/test_timegan.py index 7296936..6a8d179 100644 --- a/tests/test_timegan.py +++ b/tests/test_timegan.py @@ -1,4 +1,5 @@ import pytest +from unittest.mock import Mock import tsgm import tensorflow as tf @@ -14,7 +15,12 @@ def test_timegan(): dataset = _gen_dataset(batch_size, seq_len, feature_dim) timegan = tsgm.models.timeGAN.TimeGAN( - seq_len=seq_len, module="gru", hidden_dim=latent_dim, n_features=feature_dim, n_layers=3, batch_size=batch_size + seq_len=seq_len, + module="gru", + hidden_dim=latent_dim, + n_features=feature_dim, + n_layers=3, + batch_size=batch_size, ) timegan.compile() timegan.fit(dataset, epochs=1) @@ -34,7 +40,12 @@ def test_timegan_on_dataset(): dataset = _gen_tf_dataset(batch_size, seq_len, feature_dim) # tf.data.Dataset timegan = tsgm.models.timeGAN.TimeGAN( - seq_len=seq_len, module="gru", hidden_dim=latent_dim, n_features=feature_dim, n_layers=3, batch_size=batch_size + seq_len=seq_len, + module="gru", + hidden_dim=latent_dim, + n_features=feature_dim, + n_layers=3, + batch_size=batch_size, ) timegan.compile() timegan.fit(dataset, epochs=1) @@ -91,7 +102,6 @@ def _gen_tf_dataset(no, seq_len, dim): def _check_internals(timegan): - # Check internal nets assert timegan.generator is not None assert timegan.discriminator is not None @@ -111,3 +121,150 @@ def _check_internals(timegan): assert timegan.embedder_opt is not None assert timegan.autoencoder_opt is not None assert timegan.adversarialsup_opt is not None + + +def test_losstracker(): + losstracker = tsgm.models.timeGAN.LossTracker() + losstracker["foo"] = 0.1 + assert isinstance(losstracker.to_numpy(), np.ndarray) + assert isinstance(losstracker.labels(), list) + + +@pytest.fixture +def mocked_gradienttape(mocker): + mock = Mock() + mock.gradient.return_value = [1.0, 1.0, 1.0] + return mock + + +def test_train_timegan(mocked_gradienttape): + latent_dim = 24 + feature_dim = 6 + seq_len = 24 + batch_size = 2 + + dataset = _gen_dataset(batch_size, seq_len, feature_dim) + timegan = tsgm.models.timeGAN.TimeGAN( + seq_len=seq_len, + module="gru", + hidden_dim=latent_dim, + n_features=feature_dim, + n_layers=3, + batch_size=batch_size, + ) + timegan.compile() + timegan.fit(dataset, epochs=1) + batches = timegan._get_data_batch(dataset, n_windows=len(dataset)) + assert timegan._train_autoencoder(next(batches), timegan.autoencoder_opt) + assert timegan._train_supervisor(next(batches), timegan.adversarialsup_opt) + assert timegan._train_generator( + next(batches), next(timegan.get_noise_batch()), timegan.generator_opt + ) + assert timegan._train_embedder(next(batches), timegan.embedder_opt) + assert timegan._train_discriminator( + next(batches), next(timegan.get_noise_batch()), timegan.discriminator_opt + ) + + +@pytest.fixture +def mock_optimizer(): + yield tf.keras.optimizers.Adam(learning_rate=0.001) + + +@pytest.fixture +def mocked_data(): + feature_dim = 6 + seq_len = 24 + batch_size = 16 + yield _gen_tf_dataset(batch_size, seq_len, feature_dim) + + +@pytest.fixture +def mocked_timegan(mocked_data): + latent_dim = 24 + feature_dim = 6 + seq_len = 24 + batch_size = 16 + + timegan = tsgm.models.timeGAN.TimeGAN( + seq_len=seq_len, + module="gru", + hidden_dim=latent_dim, + n_features=feature_dim, + n_layers=3, + batch_size=batch_size, + ) + timegan.compile() + timegan.fit(mocked_data, epochs=1) + yield timegan + + +def test_timegan_train_autoencoder(mocked_data, mocked_timegan): + batches = iter(mocked_data.repeat()) + + mocked_timegan._define_timegan() + X_ = next(batches) + loss = mocked_timegan._train_autoencoder(X_, mocked_timegan.autoencoder_opt) + + # Assert that the loss is a float + assert loss.dtype in [tf.float32, tf.float64] + + +def test_timegan_train_embedder(mocked_data, mocked_timegan): + batches = iter(mocked_data.repeat()) + + mocked_timegan._define_timegan() + X_ = next(batches) + _, loss = mocked_timegan._train_embedder(X_, mocked_timegan.embedder_opt) + + # Assert that the loss is a float + assert loss.dtype in [tf.float32, tf.float64] + + +def test_timegan_train_generator(mocked_data, mocked_timegan): + batches = iter(mocked_data.repeat()) + + mocked_timegan._define_timegan() + X_ = next(batches) + Z_ = next(mocked_timegan.get_noise_batch()) + ( + step_g_loss_u, + step_g_loss_u_e, + step_g_loss_s, + step_g_loss_v, + step_g_loss, + ) = mocked_timegan._train_generator(X_, Z_, mocked_timegan.generator_opt) + + # Assert that the loss is a float + for loss in ( + step_g_loss_u, + step_g_loss_u_e, + step_g_loss_s, + step_g_loss_v, + step_g_loss, + ): + assert loss.dtype in [tf.float32, tf.float64] + + +def test_timegan_check_discriminator_loss(mocked_data, mocked_timegan): + batches = iter(mocked_data.repeat()) + + mocked_timegan._define_timegan() + X_ = next(batches) + Z_ = next(mocked_timegan.get_noise_batch()) + loss = mocked_timegan._check_discriminator_loss(X_, Z_) + + # Assert that the loss is a float + assert loss.dtype in [tf.float32, tf.float64] + + +def test_timegan_train_discriminator(mocked_data, mocked_timegan): + batches = iter(mocked_data.repeat()) + + mocked_timegan._define_timegan() + X_ = next(batches) + Z_ = next(mocked_timegan.get_noise_batch()) + loss = mocked_timegan._train_discriminator(X_, Z_, mocked_timegan.discriminator_opt) + + # Assert that the loss is a float + assert loss.dtype in [tf.float32, tf.float64] diff --git a/tsgm/models/timeGAN.py b/tsgm/models/timeGAN.py index 7459864..840e748 100644 --- a/tsgm/models/timeGAN.py +++ b/tsgm/models/timeGAN.py @@ -492,7 +492,7 @@ def fit( self._define_timegan() # 1. Embedding network training - print("Start Embedding Network Training") + logger.info("Start Embedding Network Training") for epoch in tqdm(range(epochs), desc="Autoencoder - training"): X_ = next(batches) @@ -500,13 +500,13 @@ def fit( # Checkpoint if checkpoints_interval is not None and epoch % checkpoints_interval == 0: - print(f"step: {epoch}/{epochs}, e_loss: {step_e_loss_0}") + logger.info(f"step: {epoch}/{epochs}, e_loss: {step_e_loss_0}") self.training_losses_history["autoencoder"] = float(step_e_loss_0) - print("Finished Embedding Network Training") + logger.info("Finished Embedding Network Training") # 2. Training only with supervised loss - print("Start Training with Supervised Loss Only") + logger.info("Start Training with Supervised Loss Only") # Adversarial Supervised network training for epoch in tqdm(range(epochs), desc="Adversarial Supervised - training"): @@ -515,17 +515,17 @@ def fit( # Checkpoint if checkpoints_interval is not None and epoch % checkpoints_interval == 0: - print( + logger.info( f"step: {epoch}/{epochs}, s_loss: {np.round(np.sqrt(step_g_loss_s), 4)}" ) self.training_losses_history["adversarial_supervised"] = float( np.sqrt(step_g_loss_s) ) - print("Finished Training with Supervised Loss Only") + logger.info("Finished Training with Supervised Loss Only") # 3. Joint Training - print("Start Joint Training") + logger.info("Start Joint Training") # GAN with embedding network training for epoch in tqdm(range(epochs), desc="GAN with embedding - training"): @@ -554,12 +554,14 @@ def fit( Z_ = next(self.get_noise_batch()) step_d_loss = self._check_discriminator_loss(X_, Z_) if step_d_loss > 0.15: - print("Train Discriminator (discriminator does not work well yet)") + logger.info( + "Train Discriminator (discriminator does not work well yet)" + ) step_d_loss = self._train_discriminator(X_, Z_, self.discriminator_opt) # Print multiple checkpoints if checkpoints_interval is not None and epoch % checkpoints_interval == 0: - print( + logger.info( f"""step: {epoch}/{epochs}, d_loss: {np.round(step_d_loss, 4)}, g_loss_u: {np.round(step_g_loss_u, 4)}, @@ -582,7 +584,7 @@ def fit( _sample = self.generate(n_samples=len(data)) self.synthetic_data_generated_in_training[epoch] = _sample - print("Finished Joint Training") + logger.info("Finished Joint Training") return def generate(self, n_samples: int) -> TensorLike: