From fa100097cf2f6a4fecbb15c20b3736cde7d3da3c Mon Sep 17 00:00:00 2001 From: "Thomas S. Binns" Date: Mon, 16 Sep 2024 16:35:04 +0200 Subject: [PATCH] [ENH] Make `SpatioSpectralFilter` class more similar to `scikit-learn` fit-transform classes. (#22) --- changelog.md | 2 + examples/plot_compute_waveshape_noisy_data.py | 8 +- src/pybispectra/cfc/aac.py | 4 +- src/pybispectra/cfc/pac.py | 2 +- src/pybispectra/cfc/ppc.py | 4 +- src/pybispectra/general/general.py | 2 +- src/pybispectra/utils/_utils.py | 6 +- src/pybispectra/utils/ged.py | 155 ++++++++++++++++-- src/pybispectra/waveshape/waveshape.py | 4 +- tests/test_ged.py | 42 ++++- 10 files changed, 192 insertions(+), 37 deletions(-) diff --git a/changelog.md b/changelog.md index 8be5b23..ae26bf2 100644 --- a/changelog.md +++ b/changelog.md @@ -7,10 +7,12 @@ ##### Bug Fixes - Fixed error where the number of subplots exceeding the number of nodes would cause plotting to fail. +- Fixed error where bandpass filter settings for the SSD method in `SpatioSpectralFilter` were not being applied correctly. ##### API - Changed the default value of `min_ratio` in `SpatioSpectralFilter.get_transformed_data()` from `1.0` to `-inf`. - Added the option to control whether a copy is returned from the `get_results()` method of all `Results...` classes and from `SpatioSpectralFilter.get_transformed_data()` (default behaviour returns a copy, like in previous versions). +- Added new `fit_ssd()`, `fit_hpmax()`, and `transform()` methods to the `SpatioSpectralFilter` class to bring it more in line with `scikit-learn` fit-transform classes. ##### Documentation - Added a new example for computing the bispectrum and threenorm using the general classes. diff --git a/examples/plot_compute_waveshape_noisy_data.py b/examples/plot_compute_waveshape_noisy_data.py index accdd92..a309ffa 100644 --- a/examples/plot_compute_waveshape_noisy_data.py +++ b/examples/plot_compute_waveshape_noisy_data.py @@ -88,10 +88,12 @@ # perform spatio-spectral filtering ssf = SpatioSpectralFilter(data=data, sampling_freq=sampling_freq, verbose=False) -ssf.fit_transform_hpmax(signal_bounds=(18, 22), noise_bounds=(15, 25), n_harmonics=2) +transformed_data = ssf.fit_transform_hpmax( + signal_bounds=(18, 22), noise_bounds=(15, 25), n_harmonics=2 +) -# return the first component of the filtered data -transformed_data = (ssf.get_transformed_data()[:, 0])[:, np.newaxis, :] +# select the first component of the filtered data +transformed_data = transformed_data[:, [0], :] print( f"Original timeseries data: [{data.shape[0]} epochs x {data.shape[1]} channel(s) x " diff --git a/src/pybispectra/cfc/aac.py b/src/pybispectra/cfc/aac.py index 1def5cf..b29770e 100644 --- a/src/pybispectra/cfc/aac.py +++ b/src/pybispectra/cfc/aac.py @@ -150,8 +150,8 @@ def _compute_aac(self) -> None: ) except MemoryError as error: # pragma: no cover raise MemoryError( - "Memory allocation for the bispectrum computation failed. Try reducing " - "the sampling frequency of the data, or reduce the precision of the " + "Memory allocation for the AAC computation failed. Try reducing the " + "sampling frequency of the data, or reduce the precision of the " "computation with `pybispectra.set_precision('single')`." ) from error diff --git a/src/pybispectra/cfc/pac.py b/src/pybispectra/cfc/pac.py index 85cf788..4158b13 100644 --- a/src/pybispectra/cfc/pac.py +++ b/src/pybispectra/cfc/pac.py @@ -309,7 +309,7 @@ def _compute_threenorm(self) -> None: ).transpose(1, 0, 2, 3) except MemoryError as error: # pragma: no cover raise MemoryError( - "Memory allocation for the bispectrum computation failed. Try reducing " + "Memory allocation for the threenorm computation failed. Try reducing " "the sampling frequency of the data, or reduce the precision of the " "computation with `pybispectra.set_precision('single')`." ) from error diff --git a/src/pybispectra/cfc/ppc.py b/src/pybispectra/cfc/ppc.py index b54d457..210ff57 100644 --- a/src/pybispectra/cfc/ppc.py +++ b/src/pybispectra/cfc/ppc.py @@ -150,8 +150,8 @@ def _compute_ppc(self) -> None: ) except MemoryError as error: # pragma: no cover raise MemoryError( - "Memory allocation for the bispectrum computation failed. Try reducing " - "the sampling frequency of the data, or reduce the precision of the " + "Memory allocation for the PPC computation failed. Try reducing the " + "sampling frequency of the data, or reduce the precision of the " "computation with `pybispectra.set_precision('single')`." ) from error diff --git a/src/pybispectra/general/general.py b/src/pybispectra/general/general.py index d64b1a1..1b3ad50 100644 --- a/src/pybispectra/general/general.py +++ b/src/pybispectra/general/general.py @@ -371,7 +371,7 @@ def _compute_threenorm(self) -> None: ).transpose(1, 0, 2, 3) except MemoryError as error: # pragma: no cover raise MemoryError( - "Memory allocation for the bispectrum computation failed. Try reducing " + "Memory allocation for the threenorm computation failed. Try reducing " "the sampling frequency of the data, or reduce the precision of the " "computation with `pybispectra.set_precision('single')`." ) from error diff --git a/src/pybispectra/utils/_utils.py b/src/pybispectra/utils/_utils.py index 67e7bb3..f2b2866 100644 --- a/src/pybispectra/utils/_utils.py +++ b/src/pybispectra/utils/_utils.py @@ -3,7 +3,7 @@ import numpy as np from mne import Info, create_info from mne.parallel import parallel_func -from mne.utils import ProgressBar +from mne.utils import ProgressBar, set_log_level from numba import njit from pybispectra.utils._defaults import _precision @@ -66,11 +66,15 @@ def _compute_in_parallel( parallel, my_parallel_func, _ = parallel_func( func, n_jobs, prefer=prefer, verbose=verbose ) + old_log_level = set_log_level( + verbose="INFO" if verbose else "WARNING", return_old_level=True + ) # need to set log level that is passed to tqdm for block_i in ProgressBar(range(n_blocks), mesg=message): idcs = _get_block_indices(block_i, n_steps, n_jobs) output[idcs] = parallel( my_parallel_func(**loop_kwargs[idx], **static_kwargs) for idx in idcs ) + set_log_level(verbose=old_log_level) # reset log level return output diff --git a/src/pybispectra/utils/ged.py b/src/pybispectra/utils/ged.py index 2f3b905..e782083 100644 --- a/src/pybispectra/utils/ged.py +++ b/src/pybispectra/utils/ged.py @@ -20,6 +20,7 @@ class SpatioSpectralFilter: Parameters ---------- data : ~numpy.ndarray, shape of [epochs, channels, times] + Data to perform spatiospectral filtering on. sampling_freq : int | float Sampling frequency (in Hz) of :attr:`data`. @@ -29,6 +30,15 @@ class SpatioSpectralFilter: Methods ------- + fit_hpmax : + Fit HPMax filters to the data. + + fit_ssd : + Fit SSD filters to the data. + + transform : + Transform the data with the fitted filters. + fit_transform_hpmax : Fit HPMax filters and transform the data. @@ -141,9 +151,12 @@ class SpatioSpectralFilter: filters = None patterns = None ratios = None + _ssd = None _transformed_data = None _fitted = False + _fitted_method = None + _transformed = False def __init__( self, @@ -214,7 +227,7 @@ def _sort_bandpass_filter(self, bandpass_filter: bool) -> None: if not isinstance(bandpass_filter, bool): raise TypeError("`bandpass_filter` must be a bool.") - self.bandpass_filter = True + self.bandpass_filter = bandpass_filter def _sort_n_harmonics(self, n_harmonics: int) -> None: """Sort harmonic use input.""" @@ -283,7 +296,7 @@ def _sort_csd_method(self, csd_method: str) -> None: if csd_method not in accepted_methods: raise ValueError("`csd_method` is not recognised.") - def fit_transform_ssd( + def fit_ssd( self, signal_bounds: tuple[int | float], noise_bounds: tuple[int | float], @@ -292,7 +305,7 @@ def fit_transform_ssd( indices: tuple[int] | None = None, rank: int | None = None, ) -> None: - """Fit SSD filters and transform the data. + """Fit SSD filters to the data. Parameters ---------- @@ -325,6 +338,8 @@ def fit_transform_ssd( ----- The SSD implementation in MNE is used to compute the filters (:class:`mne.decoding.SSD`). + + .. versionadded:: 1.2 """ self._sort_freq_bounds(signal_bounds, noise_bounds, signal_noise_gap) self._sort_bandpass_filter(bandpass_filter) @@ -342,6 +357,7 @@ def fit_transform_ssd( self._compute_ssd(info, filt_params_signal, filt_params_noise) self._fitted = True + self._fitted_method = "SSD" if self.verbose: print(" ... SSD filter fitting finished\n") @@ -397,7 +413,7 @@ def _compute_ssd( "PyBispectra Internal Error: channel types in `info` should all be 'eeg'. " "Please contact the PyBispectra developers." ) - ssd = SSD( + self._ssd = SSD( info, filt_params_signal, filt_params_noise, @@ -408,13 +424,13 @@ def _compute_ssd( return_filtered=self.bandpass_filter, rank={"eeg": self.rank}, ) - self._transformed_data = ssd.fit_transform(self.data[:, self.indices]) + self._ssd.fit(self.data[:, self.indices]) - self.filters = ssd.filters_ - self.patterns = ssd.patterns_ - self.ratios = ssd.eigvals_ + self.filters = self._ssd.filters_ + self.patterns = self._ssd.patterns_ + self.ratios = self._ssd.eigvals_ - def fit_transform_hpmax( + def fit_hpmax( self, signal_bounds: tuple[int | float], noise_bounds: tuple[int | float], @@ -428,7 +444,7 @@ def fit_transform_hpmax( mt_low_bias: bool = True, n_jobs: int = 1, ) -> None: - """Fit HPMax filters and transform the data. + """Fit HPMax filters to the data. Parameters ---------- @@ -484,6 +500,8 @@ def fit_transform_hpmax( MNE is used to compute the CSD, from which the covariance matrices are obtained :footcite:`Bartz2019` (:func:`mne.time_frequency.csd_array_multitaper` and :func:`mne.time_frequency.csd_array_fourier`). + + .. versionadded:: 1.2 """ self._sort_freq_bounds(signal_bounds, noise_bounds, 0.0) self._sort_n_harmonics(n_harmonics) @@ -506,6 +524,7 @@ def fit_transform_hpmax( self._compute_hpmax(csd, freqs) self._fitted = True + self._fitted_method = "HPMax" if self.verbose: print(" ... HPMax filter fitting finished\n") @@ -613,13 +632,6 @@ def _compute_hpmax(self, csd: np.ndarray, freqs: np.ndarray) -> None: self.patterns = np.linalg.pinv(self.filters).astype(_precision.real) self.ratios = eigvals[eig_idx].astype(_precision.real) - self._transformed_data = np.einsum( - "ijk,jl->ilk", - self.data[:, self.indices], - self.filters, - dtype=_precision.real, - ) - if self.verbose: print(" ... HPMax filter computation finished\n") @@ -694,6 +706,108 @@ def _project_cov_rank_subspace( return cov_signal, cov_noise, projection + def transform(self, data: np.ndarray | None = None) -> np.ndarray: + """Transform the data with the fitted filters. + + Parameters + ---------- + data : ~numpy.ndarray, shape of [epochs, channels, times] | None (default None) + Data to transform with the fitted filters. If :obj:`None`, the data used to + fit the filters is transformed. + + Returns + ------- + transformed_data : ~numpy.ndarray, shape of [epochs, components, times] + Transformed data. + + Notes + ----- + + .. versionadded:: 1.2 + """ + if not self._fitted: + raise ValueError( + "No filters have been fit. Please call `fit_ssd` or `fit_hpmax` before " + "transforming the data." + ) + + if data is None: + data = self.data + if not isinstance(data, np.ndarray): + raise TypeError("`data` must be a NumPy array.") + if data.ndim != 3: + raise ValueError("`data` must be a 3D array.") + if data.shape[1] != self.filters.shape[0]: + raise ValueError( + "`data` must have the same number of channels as the filters." + ) + + if self.verbose: + print("Transforming data with filters...\n") + + if self.bandpass_filter and self._fitted_method == "SSD": + self._transformed_data = self._ssd.transform(data) + else: + self._transformed_data = np.einsum( + "ijk,jl->ilk", + data[:, self.indices], + self.filters, + dtype=_precision.real, + ) + + if self.verbose: + print(" ... Data transformation finished\n") + + self._transformed = True + + return self._transformed_data + + def fit_transform_ssd(self, *args: tuple, **kwargs: dict) -> np.ndarray: + """Fit SSD filters and transform the data. + + Parameters + ---------- + args : tuple + Positional parameters to pass to :meth:`fit_ssd`. + + kwargs : dict + Keyword parameters to pass to :meth:`fit_ssd`. + + Returns + ------- + transformed_data : ~numpy.ndarray, shape of [epochs, components, times] + Transformed data. + + Notes + ----- + Equivalent to calling :meth:`fit_ssd` followed by :meth:`transform`. + """ + self.fit_ssd(*args, **kwargs) + return self.transform() + + def fit_transform_hpmax(self, *args: tuple, **kwargs: dict) -> np.ndarray: + """Fit HPMax filters and transform the data. + + Parameters + ---------- + args : tuple + Positional parameters to pass to :meth:`fit_hpmax`. + + kwargs : dict + Keyword parameters to pass to :meth:`fit_hpmax`. + + Returns + ------- + transformed_data : ~numpy.ndarray, shape of [epochs, components, times] + Transformed data. + + Notes + ----- + Equivalent to calling :meth:`fit_hpmax` followed by :meth:`transform`. + """ + self.fit_hpmax(*args, **kwargs) + return self.transform() + def get_transformed_data( self, min_ratio: int | float = -np.inf, copy: bool = True ) -> np.ndarray: @@ -724,6 +838,13 @@ def get_transformed_data( Raises a warning if no components have a signal-to-noise ratio > ``min_ratio`` and :attr:`verbose` is :obj:`True`. """ + if not self._transformed: + raise ValueError( + "No data has been transformed. Please call `transform`, " + "`fit_transform_ssd`, or `fit_transform_hpmax` before getting the " + "transformed data." + ) + if not isinstance(min_ratio, (int, float)): raise TypeError("`min_ratio` must be an int or a float") if not isinstance(copy, bool): diff --git a/src/pybispectra/waveshape/waveshape.py b/src/pybispectra/waveshape/waveshape.py index cc1a2ec..ebba34c 100644 --- a/src/pybispectra/waveshape/waveshape.py +++ b/src/pybispectra/waveshape/waveshape.py @@ -197,7 +197,7 @@ def _compute_bispectrum(self) -> None: (self._n_cons, 1, self._f1s.size, self._f2s.size), dtype=_precision.complex, ), - message="Processing connections...", + message="Processing channels...", n_jobs=self._n_jobs, verbose=self.verbose, prefer="processes", @@ -234,7 +234,7 @@ def _compute_threenorm(self) -> None: (self._n_cons, 1, self._f1s.size, self._f2s.size), dtype=_precision.real, ), - message="Processing connections...", + message="Processing channels...", n_jobs=self._n_jobs, verbose=self.verbose, prefer="processes", diff --git a/tests/test_ged.py b/tests/test_ged.py index 427a8df..953f86a 100644 --- a/tests/test_ged.py +++ b/tests/test_ged.py @@ -12,8 +12,8 @@ def test_error_catch(method: str) -> None: """Check that SpatioSpectralFilter class catches errors.""" n_chans = 3 n_epochs = 5 - n_times = 100 - sampling_freq = 100 + n_times = 200 + sampling_freq = 50 data = _generate_data(n_epochs, n_chans, n_times) signal_bounds = (10, 15) noise_bounds = (8, 17) @@ -196,9 +196,29 @@ def test_error_catch(method: str) -> None: signal_bounds=signal_bounds, noise_bounds=noise_bounds, n_jobs=-2 ) - # return transformed data - ssf.fit_transform_hpmax(signal_bounds=signal_bounds, noise_bounds=noise_bounds) + # transform data + with pytest.raises(ValueError, match="No filters have been fit."): + ssf.transform() + + # get transformed data + with pytest.raises(ValueError, match="No data has been transformed."): + ssf.get_transformed_data() + fit_method = ssf.fit_ssd if method == "ssd" else ssf.fit_hpmax + fit_method(signal_bounds=signal_bounds, noise_bounds=noise_bounds) + + with pytest.raises(TypeError, match="`data` must be a NumPy array."): + ssf.transform(data=data.tolist()) + with pytest.raises(ValueError, match="`data` must be a 3D array."): + ssf.transform(data=data[0]) + with pytest.raises( + ValueError, match="`data` must have the same number of channels as the filters." + ): + ssf.transform(data=data[:, 1:]) + + ssf.transform() + + # return transformed data with pytest.raises(TypeError, match="`min_ratio` must be an int or a float"): ssf.get_transformed_data(min_ratio=None) with pytest.warns( @@ -231,14 +251,13 @@ def test_ged_ssd_runs(bandpass_filter: bool, rank: int) -> None: ssf = SpatioSpectralFilter(data, sampling_freq) - ssf.fit_transform_ssd( + transformed_data = ssf.fit_transform_ssd( signal_bounds=signal_bounds, noise_bounds=noise_bounds, bandpass_filter=bandpass_filter, rank=rank, ) - transformed_data = ssf.get_transformed_data() assert isinstance( transformed_data, np.ndarray ), "`transformed_data` should be a NumPy array." @@ -247,6 +266,10 @@ def test_ged_ssd_runs(bandpass_filter: bool, rank: int) -> None: rank, n_times, ), "`transformed_data` should have shape (n_epochs, rank, n_times)." + assert np.allclose(transformed_data, ssf.get_transformed_data(), rtol=0, atol=0), ( + "data returned from `fit_transform_ssd()` and `get_transformed_data()` should " + "be identical." + ) if rank > 1: transformed_data = ssf.get_transformed_data( @@ -296,14 +319,13 @@ def test_ged_hpmax_runs(csd_method: str, rank: int) -> None: noise_bounds = (8, 17) ssf = SpatioSpectralFilter(data, sampling_freq) - ssf.fit_transform_hpmax( + transformed_data = ssf.fit_transform_hpmax( signal_bounds=signal_bounds, noise_bounds=noise_bounds, rank=rank, csd_method=csd_method, ) - transformed_data = ssf.get_transformed_data(min_ratio=ssf.ratios.min(), copy=False) assert isinstance( transformed_data, np.ndarray ), "`transformed_data` should be a NumPy array." @@ -312,6 +334,10 @@ def test_ged_hpmax_runs(csd_method: str, rank: int) -> None: rank, n_times, ), "`transformed_data` should have shape (n_epochs, rank, n_times)." + assert np.allclose(transformed_data, ssf.get_transformed_data(), rtol=0, atol=0), ( + "data returned from `fit_transform_hpmax()` and `get_transformed_data()` " + "should be identical." + ) assert isinstance(ssf.filters, np.ndarray), "`filters` should be a NumPy array." assert ssf.filters.shape == (