diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 7a27c61..122bd8b 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -3,10 +3,11 @@ tensorflow==2.9.1 tensorflow_probability==0.17.0 scipy>=1.7.3 numpy>=1.21.6 -optuna +dtaidistance networkx +optuna +prettytable seaborn scikit-learn -prettytable yfinance tqdm diff --git a/tests/test_augmentations.py b/tests/test_augmentations.py index 79f6452..fd52ba9 100644 --- a/tests/test_augmentations.py +++ b/tests/test_augmentations.py @@ -18,6 +18,7 @@ def test_base_compose(): tsgm.models.augmentations.Shuffle(), tsgm.models.augmentations.MagnitudeWarping(), tsgm.models.augmentations.WindowWarping(), + tsgm.models.augmentations.DTWBarycentricAveraging(), ]) def test_dimensions(aug_model): xs = np.array([[[1, 2, 3, 4], [1, 2, 3, 4]]]) @@ -77,3 +78,14 @@ def test_window_warping(): assert xs_gen.shape == (17, 2, 4) assert ys_gen.shape == (17, 1) assert np.allclose(ys_gen, np.ones((17, 1))) + + +def test_dtw_ba(): + xs = np.array([[[1, 2, 3, 4], [1, 2, 3, 4]]]) + ys = np.ones((xs.shape[0], 1)) + dtw_ba_aug = tsgm.models.augmentations.DTWBarycentricAveraging() + xs_gen, ys_gen = dtw_ba_aug.generate(X=xs, y=ys, n_samples=17) + assert xs_gen.shape == (17, 2, 4) + assert ys_gen.shape == (17, 1) + assert np.allclose(ys_gen, np.ones((17, 1))) + assert np.allclose(xs_gen, xs) diff --git a/tsgm/models/augmentations.py b/tsgm/models/augmentations.py index dba7464..1f84050 100644 --- a/tsgm/models/augmentations.py +++ b/tsgm/models/augmentations.py @@ -2,6 +2,7 @@ import numpy as np import random import scipy.interpolate +from dtaidistance import dtw_barycenter from typing import List, Dict, Any, Optional from tensorflow.python.types.core import TensorLike @@ -22,7 +23,9 @@ def _get_seeds(self, total_num: int, n_seeds: int) -> TensorLike: seeds_idx = np.random.choice(range(total_num), size=n_seeds, replace=True) return seeds_idx - def generate(self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1) -> TensorLike: + def generate( + self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1 + ) -> TensorLike: raise NotImplementedError @@ -66,13 +69,17 @@ def __init__( self.variance = variance self.mean = mean - def generate(self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1) -> TensorLike: + def generate( + self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1 + ) -> TensorLike: seeds_idx = self._get_seeds(total_num=X.shape[0], n_seeds=n_samples) sigma = self.variance**0.5 has_labels = y is not None if self.per_channel: - gauss = np.random.normal(self.mean, sigma, (n_samples, X.shape[1], X.shape[2])) + gauss = np.random.normal( + self.mean, sigma, (n_samples, X.shape[1], X.shape[2]) + ) else: gauss = np.random.normal(self.mean, sigma, (n_samples, X.shape[1])) gauss = np.expand_dims(gauss, -1) @@ -100,7 +107,9 @@ def __init__( super(SliceAndShuffle, self).__init__(per_feature) self.n_segments = n_segments - def generate(self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1) -> TensorLike: + def generate( + self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1 + ) -> TensorLike: assert 0 < self.n_segments < X.shape[1] seeds_idx = self._get_seeds(n_samples=X.shape[0], n_seeds=n_samples) @@ -146,12 +155,17 @@ def __init__(self): def _n_repeats(self, n: int, total_num: int) -> int: return math.ceil(n / total_num) - def generate(self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1) -> TensorLike: + def generate( + self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1 + ) -> TensorLike: seeds_idx = self._get_seeds(X.shape[0], n_samples) n_features = X.shape[2] n_repeats = self._n_repeats(n_samples, total_num=len(X)) - shuffle_ids = [np.random.choice(np.arange(n_features), n_features, replace=False) for _ in range(n_repeats)] + shuffle_ids = [ + np.random.choice(np.arange(n_features), n_features, replace=False) + for _ in range(n_repeats) + ] synthetic_data = [] has_labels = y is not None @@ -175,18 +189,29 @@ class MagnitudeWarping(BaseAugmenter): sample by convolving the data window with a smooth curve varying around one https://dl.acm.org/doi/pdf/10.1145/3136755.3136817 """ + def __init__(self): super(MagnitudeWarping, self).__init__(per_feature=False) - def generate(self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int = 1, sigma: float = 0.2, knot: int = 4) -> TensorLike: + def generate( + self, + X: TensorLike, + y: Optional[TensorLike] = None, + n_samples: int = 1, + sigma: float = 0.2, + knot: int = 4, + ) -> TensorLike: n_data = X.shape[0] n_timesteps = X.shape[1] n_features = X.shape[2] orig_steps = np.arange(n_timesteps) - random_warps = np.random.normal(loc=1.0, scale=sigma, size=(n_samples, knot + 2, n_features)) - warp_steps = (np.ones( - (n_features, 1)) * (np.linspace(0, n_timesteps - 1., num=knot + 2))).T + random_warps = np.random.normal( + loc=1.0, scale=sigma, size=(n_samples, knot + 2, n_features) + ) + warp_steps = ( + np.ones((n_features, 1)) * (np.linspace(0, n_timesteps - 1.0, num=knot + 2)) + ).T result = np.zeros((n_samples, n_timesteps, n_features)) has_labels = y is not None @@ -195,8 +220,14 @@ def generate(self, X: TensorLike, y: Optional[TensorLike] = None, n_samples: int for i in range(n_samples): random_sample_id = random.randint(0, n_data - 1) - warper = np.array([scipy.interpolate.CubicSpline( - warp_steps[:, dim], random_warps[i, :, dim])(orig_steps) for dim in range(n_features)]).T + warper = np.array( + [ + scipy.interpolate.CubicSpline( + warp_steps[:, dim], random_warps[i, :, dim] + )(orig_steps) + for dim in range(n_features) + ] + ).T result[i] = X[random_sample_id] * warper if has_labels: result_y[i] = y[random_sample_id] @@ -210,10 +241,18 @@ class WindowWarping(BaseAugmenter): """ https://halshs.archives-ouvertes.fr/halshs-01357973/document """ + def __init__(self): super(WindowWarping, self).__init__(per_feature=False) - def generate(self, X: TensorLike, y: Optional[TensorLike] = None, window_ratio=0.2, scales=[0.25, 1.0], n_samples=1): + def generate( + self, + X: TensorLike, + y: Optional[TensorLike] = None, + window_ratio=0.2, + scales=[0.25, 1.0], + n_samples=1, + ): n_data = X.shape[0] n_timesteps = X.shape[1] n_features = X.shape[2] @@ -221,8 +260,8 @@ def generate(self, X: TensorLike, y: Optional[TensorLike] = None, window_ratio=0 scales_per_sample = np.random.choice(scales, n_samples) warp_size = max(np.round(window_ratio * n_timesteps).astype(np.int64), 1) window_starts = np.random.randint( - low=0, high=n_timesteps - warp_size, - size=(n_samples)) + low=0, high=n_timesteps - warp_size, size=(n_samples) + ) window_ends = window_starts + warp_size result = np.zeros((n_samples, n_timesteps, n_features)) @@ -232,17 +271,20 @@ def generate(self, X: TensorLike, y: Optional[TensorLike] = None, window_ratio=0 for dim in range(n_features): random_sample_id = random.randint(0, n_data - 1) random_sample = X[random_sample_id] - start_seg = random_sample[:window_starts[i], dim] + start_seg = random_sample[: window_starts[i], dim] warp_ts_size = max(round(warp_size * scales_per_sample[i]), 1) window_seg = np.interp( x=np.linspace(0, warp_size - 1, num=warp_ts_size), xp=np.arange(warp_size), - fp=random_sample[window_starts[i] : window_ends[i], dim]) - end_seg = random_sample[window_ends[i]:, dim] + fp=random_sample[window_starts[i] : window_ends[i], dim], + ) + end_seg = random_sample[window_ends[i] :, dim] warped = np.concatenate((start_seg, window_seg, end_seg)) result[i, :, dim] = np.interp( np.arange(n_timesteps), - np.linspace(0, n_timesteps - 1., num=warped.size), warped).T + np.linspace(0, n_timesteps - 1.0, num=warped.size), + warped, + ).T if has_labels: result_y[i] = y[random_sample_id] @@ -250,3 +292,103 @@ def generate(self, X: TensorLike, y: Optional[TensorLike] = None, window_ratio=0 return result, result_y else: return result + + +class DTWBarycentricAveraging(BaseAugmenter): + """ + DTW Barycenter Averaging (DBA) [1] method estimated through + Expectation-Maximization algorithm [2] as in https://github.com/tslearn-team/tslearn/ + ---------- + References + ---------- + .. [1] F. Petitjean, A. Ketterlin & P. Gancarski. A global averaging method + for dynamic time warping, with applications to clustering. Pattern + Recognition, Elsevier, 2011, Vol. 44, Num. 3, pp. 678-693 + .. [2] D. Schultz and B. Jain. Nonsmooth Analysis and Subgradient Methods + for Averaging in Dynamic Time Warping Spaces. + Pattern Recognition, 74, 340-358. + """ + + def __init__(self): + super(DTWBarycentricAveraging, self).__init__(per_feature=False) + self.data = None + + def generate( + self, + X: TensorLike, + y: Optional[TensorLike] = None, + n_samples: int = 1, + seed_sample_size: Optional[int] = None, + seed_timeseries: Optional[TensorLike] = None, + **kwargs, + ) -> TensorLike: + """ + Parameters + ---------- + X : TensorLike, the timeseries dataset + y : TensorLike or None, the classes + n_samples : int, number of samples to generate (per class, if y is given) + seed_sample_size : int or None (default: None) + The number of timeseries to draw (per class) from the dataset before computing DTW_BA. + If None, use the entire set (per class). + seed_timeseries : array or None (default: None) + Initial timesteries to start from for the optimization process, with shape (original_size, d). + In case y is given, the shape of seed_timeseries is assumed to be (n_classes, original_size, d) + Returns + ------- + np.array of shape (n_samples, original_size, d) if y is None + or (n_classes * n_samples, original_size, d), + and np.array of labels (or None) + """ + self.data = X + _has_labels = y is not None + if _has_labels: + self.labels = y + _unique_classes = sorted(np.unique(y)) + _y = [] + _X = [] + for i, _label in enumerate(_unique_classes): + logger.info(f"Class {_label}...") + _X_class = X[np.ravel(y) == _label] + _seed_timeseries = None + if seed_timeseries is not None: + _seed_timeseries = seed_timeseries[i] + _y += [_label] * n_samples + _X.append( + self._dtwba( + X_subset=_X_class, + n_synth_samples=n_samples, + seed_sample_size=seed_sample_size, + seed_timeseries=_seed_timeseries, + **kwargs, + ) + ) + return np.concatenate(_X), np.array(_y).reshape(-1, 1) + else: + return self._dtwba( + X_subset=X, + n_synth_samples=n_samples, + seed_sample_size=seed_sample_size, + seed_timeseries=seed_timeseries, + **kwargs, + ) + + def _dtwba( + self, + X_subset: TensorLike, + n_synth_samples: int, + seed_sample_size: Optional[int], + seed_timeseries: Optional[TensorLike], + **kwargs, + ): + _samples = [] + for _ in range(n_synth_samples): + _samples.append( + dtw_barycenter.dba( + s=X_subset, + c=seed_timeseries, + nb_initial_samples=seed_sample_size, + **kwargs, + ) + ) + return np.array(_samples)