From 6013462aedcd01a00d5483b53692764365661dfd Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 Date: Wed, 12 Jun 2024 23:50:53 +0530 Subject: [PATCH 01/15] Add STFT function to Function class Signed-off-by: AdvaitChandorkar07 --- rocketpy/mathutils/function.py | 71 ++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index d10ffe89a..20f5dbf8f 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1012,6 +1012,77 @@ def to_frequency_domain(self, lower, upper, sampling_frequency, remove_dc=True): extrapolation="zero", ) + def to_stft(self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True): + ''' + Performs the Short-Time Fourier Transform (STFT) of the Function and + returns the result. The STFT is computed by applying the Fourier transform + to overlapping windows of the Function. + + Parameters + ---------- + lower : float + Lower bound of the time range. + upper : float + Upper bound of the time range. + sampling_frequency : float + Sampling frequency at which to perform the Fourier transform. + window_size : float + Size of the window for the STFT, in seconds. + step_size : float + Step size for the window, in seconds. + remove_dc : bool, optional + If True, the DC component is removed from each window before computing the Fourier transform. + + Returns + ------- + list of Function + A list of Functions, each representing the STFT of a window. + + Examples + -------- + >>> from rocketpy import Function + >>> import numpy as np + >>> main_frequency = 10 # Hz + >>> time = np.linspace(0, 10, 1000) + >>> signal = np.sin(2 * np.pi * main_frequency * time) + >>> time_domain = Function(np.array([time, signal]).T) + >>> stft_result = time_domain.to_stft( + ... lower=0, upper=10, sampling_frequency=100, + ... window_size=1, step_size=0.1 + ... ) + >>> for window in stft_result: + ... peak_frequencies_index = np.where(window[:, 1] > 0.001) + ... peak_frequencies = window[peak_frequencies_index, 0] + ... print(peak_frequencies) + [[-10. 10.]] + ''' + # Get the time domain data + sampling_time_step = 1.0 / sampling_frequency + sampling_range = np.arange(lower, upper, sampling_time_step) + sampled_points = self(sampling_range) + window_samples = int(window_size * sampling_frequency) + step_samples = int(step_size * sampling_frequency) + + stft_results = [] + for start in range(0, len(sampled_points) - window_samples + 1, step_samples): + windowed_samples = sampled_points[start:start + window_samples] + if remove_dc: + windowed_samples -= np.mean(windowed_samples) + fourier_amplitude = np.abs(np.fft.fft(windowed_samples) / (window_samples / 2)) + fourier_frequencies = np.fft.fftfreq(window_samples, sampling_time_step) + stft_results.append( + Function( + source=np.array([fourier_frequencies, fourier_amplitude]).T, + inputs="Frequency (Hz)", + outputs="Amplitude", + interpolation="linear", + extrapolation="zero", + ) + ) + return stft_results + + + def low_pass_filter(self, alpha, file_path=None): """Implements a low pass filter with a moving average filter. This does not mutate the original Function object, but returns a new one with the From 027e7f0d6c0ab036c046c2ddf803d0a3ec1012ac Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 Date: Thu, 13 Jun 2024 00:38:51 +0530 Subject: [PATCH 02/15] Added feature: Short-Time Fourier Transform function --- rocketpy/mathutils/function.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index 20f5dbf8f..24c478a72 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1012,7 +1012,9 @@ def to_frequency_domain(self, lower, upper, sampling_frequency, remove_dc=True): extrapolation="zero", ) - def to_stft(self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True): + def to_stft( + self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True + ): ''' Performs the Short-Time Fourier Transform (STFT) of the Function and returns the result. The STFT is computed by applying the Fourier transform @@ -1065,10 +1067,12 @@ def to_stft(self, lower, upper, sampling_frequency, window_size, step_size, remo stft_results = [] for start in range(0, len(sampled_points) - window_samples + 1, step_samples): - windowed_samples = sampled_points[start:start + window_samples] + windowed_samples = sampled_points[start : start + window_samples] if remove_dc: windowed_samples -= np.mean(windowed_samples) - fourier_amplitude = np.abs(np.fft.fft(windowed_samples) / (window_samples / 2)) + fourier_amplitude = np.abs( + np.fft.fft(windowed_samples) / (window_samples / 2) + ) fourier_frequencies = np.fft.fftfreq(window_samples, sampling_time_step) stft_results.append( Function( @@ -1080,8 +1084,6 @@ def to_stft(self, lower, upper, sampling_frequency, window_size, step_size, remo ) ) return stft_results - - def low_pass_filter(self, alpha, file_path=None): """Implements a low pass filter with a moving average filter. This does From 98fbb6d690e738e996db504f1ac0f7db6c8af442 Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 Date: Thu, 13 Jun 2024 01:07:56 +0530 Subject: [PATCH 03/15] Added feature: Short-Time Fourier Transform function --- rocketpy/mathutils/function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index 24c478a72..7247d0a6f 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1055,7 +1055,7 @@ def to_stft( >>> for window in stft_result: ... peak_frequencies_index = np.where(window[:, 1] > 0.001) ... peak_frequencies = window[peak_frequencies_index, 0] - ... print(peak_frequencies) + >>> print(peak_frequencies) [[-10. 10.]] ''' # Get the time domain data From 4f682bc5363cedc737af6e0a69bf183bb2cbf907 Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 <110400437+AdvaitChandorkar07@users.noreply.github.com> Date: Sat, 13 Jul 2024 18:11:06 +0530 Subject: [PATCH 04/15] "Variable name changes in stft" --- rocketpy/mathutils/function.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index 7247d0a6f..4575e3940 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1012,9 +1012,7 @@ def to_frequency_domain(self, lower, upper, sampling_frequency, remove_dc=True): extrapolation="zero", ) - def to_stft( - self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True - ): + def short_time_fft(self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True): ''' Performs the Short-Time Fourier Transform (STFT) of the Function and returns the result. The STFT is computed by applying the Fourier transform @@ -1062,18 +1060,17 @@ def to_stft( sampling_time_step = 1.0 / sampling_frequency sampling_range = np.arange(lower, upper, sampling_time_step) sampled_points = self(sampling_range) - window_samples = int(window_size * sampling_frequency) - step_samples = int(step_size * sampling_frequency) - + samples_per_window= int(window_size * sampling_frequency) + samples_skipped_per_step= int(step_size * sampling_frequency) stft_results = [] - for start in range(0, len(sampled_points) - window_samples + 1, step_samples): - windowed_samples = sampled_points[start : start + window_samples] + for start in range(0, len(sampled_points) - samples_per_window+ 1, samples_skipped_per_step): + windowed_samples = sampled_points[start : start + samples_per_window] if remove_dc: windowed_samples -= np.mean(windowed_samples) fourier_amplitude = np.abs( - np.fft.fft(windowed_samples) / (window_samples / 2) + np.fft.fft(windowed_samples) / (samples_per_window/ 2) ) - fourier_frequencies = np.fft.fftfreq(window_samples, sampling_time_step) + fourier_frequencies = np.fft.fftfreq(samples_per_window, sampling_time_step) stft_results.append( Function( source=np.array([fourier_frequencies, fourier_amplitude]).T, From a319a546c5f42ad8a4e15e93dbc77b60292a1995 Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 <110400437+AdvaitChandorkar07@users.noreply.github.com> Date: Sat, 13 Jul 2024 18:17:32 +0530 Subject: [PATCH 05/15] "Variable and function name formatting" --- rocketpy/mathutils/function.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index 4575e3940..b209f394b 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1012,7 +1012,9 @@ def to_frequency_domain(self, lower, upper, sampling_frequency, remove_dc=True): extrapolation="zero", ) - def short_time_fft(self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True): + def short_time_fft( + self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True + ): ''' Performs the Short-Time Fourier Transform (STFT) of the Function and returns the result. The STFT is computed by applying the Fourier transform @@ -1060,15 +1062,17 @@ def short_time_fft(self, lower, upper, sampling_frequency, window_size, step_siz sampling_time_step = 1.0 / sampling_frequency sampling_range = np.arange(lower, upper, sampling_time_step) sampled_points = self(sampling_range) - samples_per_window= int(window_size * sampling_frequency) - samples_skipped_per_step= int(step_size * sampling_frequency) + samples_per_window = int(window_size * sampling_frequency) + samples_skipped_per_step = int(step_size * sampling_frequency) stft_results = [] - for start in range(0, len(sampled_points) - samples_per_window+ 1, samples_skipped_per_step): + for start in range( + 0, len(sampled_points) - samples_per_window + 1, samples_skipped_per_step + ): windowed_samples = sampled_points[start : start + samples_per_window] if remove_dc: windowed_samples -= np.mean(windowed_samples) fourier_amplitude = np.abs( - np.fft.fft(windowed_samples) / (samples_per_window/ 2) + np.fft.fft(windowed_samples) / (samples_per_window / 2) ) fourier_frequencies = np.fft.fftfreq(samples_per_window, sampling_time_step) stft_results.append( From cd35030d6707536835e7dad4b9913114de15d5cc Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 <110400437+AdvaitChandorkar07@users.noreply.github.com> Date: Sun, 14 Jul 2024 17:40:11 +0530 Subject: [PATCH 06/15] "Better Example" --- rocketpy/mathutils/function.py | 37 ++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index af9ec0cfe..b93d49ff4 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1044,19 +1044,34 @@ def short_time_fft( -------- >>> from rocketpy import Function >>> import numpy as np - >>> main_frequency = 10 # Hz - >>> time = np.linspace(0, 10, 1000) - >>> signal = np.sin(2 * np.pi * main_frequency * time) - >>> time_domain = Function(np.array([time, signal]).T) - >>> stft_result = time_domain.to_stft( - ... lower=0, upper=10, sampling_frequency=100, - ... window_size=1, step_size=0.1 + >>> import matplotlib.pyplot as plt + + >>> # Generate a signal with varying frequency + >>> T_x, N = 1 / 20, 1000 # 20 Hz sampling rate for 50 s signal + >>> t_x = np.arange(N) * T_x # time indexes for signal + >>> f_i = 1 * np.arctan((t_x - t_x[N // 2]) / 2) + 5 # varying frequency + >>> signal = np.sin(2 * np.pi * np.cumsum(f_i) * T_x) # the signal + + >>> # Create a Function object + >>> time_domain = Function(np.array([t_x, signal]).T) + + >>> # Perform the STFT + >>> stft_result = time_domain.short_time_fft( + ... lower=0, upper=50, sampling_frequency=20, + ... window_size=2, step_size=0.5 ... ) + + >>> # Plot the result + >>> fig, ax = plt.subplots(figsize=(10, 6)) >>> for window in stft_result: - ... peak_frequencies_index = np.where(window[:, 1] > 0.001) - ... peak_frequencies = window[peak_frequencies_index, 0] - >>> print(peak_frequencies) - [[-10. 10.]] + ... freq = window[:, 0] + ... amplitude = window[:, 1] + ... ax.plot(freq, amplitude) + + >>> ax.set_xlabel('Frequency (Hz)') + >>> ax.set_ylabel('Amplitude') + >>> ax.set_title('Short-Time Fourier Transform (STFT) Result') + >>> plt.show() ''' # Get the time domain data sampling_time_step = 1.0 / sampling_frequency From 511f00014107c60cc3d63627d6f1962fde82935b Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 Date: Wed, 12 Jun 2024 23:50:53 +0530 Subject: [PATCH 07/15] Add STFT function to Function class Signed-off-by: AdvaitChandorkar07 --- rocketpy/mathutils/function.py | 71 ++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index 55680d199..10f3505ee 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1012,6 +1012,77 @@ def to_frequency_domain(self, lower, upper, sampling_frequency, remove_dc=True): extrapolation="zero", ) + def to_stft(self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True): + ''' + Performs the Short-Time Fourier Transform (STFT) of the Function and + returns the result. The STFT is computed by applying the Fourier transform + to overlapping windows of the Function. + + Parameters + ---------- + lower : float + Lower bound of the time range. + upper : float + Upper bound of the time range. + sampling_frequency : float + Sampling frequency at which to perform the Fourier transform. + window_size : float + Size of the window for the STFT, in seconds. + step_size : float + Step size for the window, in seconds. + remove_dc : bool, optional + If True, the DC component is removed from each window before computing the Fourier transform. + + Returns + ------- + list of Function + A list of Functions, each representing the STFT of a window. + + Examples + -------- + >>> from rocketpy import Function + >>> import numpy as np + >>> main_frequency = 10 # Hz + >>> time = np.linspace(0, 10, 1000) + >>> signal = np.sin(2 * np.pi * main_frequency * time) + >>> time_domain = Function(np.array([time, signal]).T) + >>> stft_result = time_domain.to_stft( + ... lower=0, upper=10, sampling_frequency=100, + ... window_size=1, step_size=0.1 + ... ) + >>> for window in stft_result: + ... peak_frequencies_index = np.where(window[:, 1] > 0.001) + ... peak_frequencies = window[peak_frequencies_index, 0] + ... print(peak_frequencies) + [[-10. 10.]] + ''' + # Get the time domain data + sampling_time_step = 1.0 / sampling_frequency + sampling_range = np.arange(lower, upper, sampling_time_step) + sampled_points = self(sampling_range) + window_samples = int(window_size * sampling_frequency) + step_samples = int(step_size * sampling_frequency) + + stft_results = [] + for start in range(0, len(sampled_points) - window_samples + 1, step_samples): + windowed_samples = sampled_points[start:start + window_samples] + if remove_dc: + windowed_samples -= np.mean(windowed_samples) + fourier_amplitude = np.abs(np.fft.fft(windowed_samples) / (window_samples / 2)) + fourier_frequencies = np.fft.fftfreq(window_samples, sampling_time_step) + stft_results.append( + Function( + source=np.array([fourier_frequencies, fourier_amplitude]).T, + inputs="Frequency (Hz)", + outputs="Amplitude", + interpolation="linear", + extrapolation="zero", + ) + ) + return stft_results + + + def low_pass_filter(self, alpha, file_path=None): """Implements a low pass filter with a moving average filter. This does not mutate the original Function object, but returns a new one with the From a2d373f14f05a265791c2ed33dd6271232b0b844 Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 Date: Thu, 13 Jun 2024 00:38:51 +0530 Subject: [PATCH 08/15] Added feature: Short-Time Fourier Transform function --- rocketpy/mathutils/function.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index 10f3505ee..c9361c85b 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1012,7 +1012,9 @@ def to_frequency_domain(self, lower, upper, sampling_frequency, remove_dc=True): extrapolation="zero", ) - def to_stft(self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True): + def to_stft( + self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True + ): ''' Performs the Short-Time Fourier Transform (STFT) of the Function and returns the result. The STFT is computed by applying the Fourier transform @@ -1065,10 +1067,12 @@ def to_stft(self, lower, upper, sampling_frequency, window_size, step_size, remo stft_results = [] for start in range(0, len(sampled_points) - window_samples + 1, step_samples): - windowed_samples = sampled_points[start:start + window_samples] + windowed_samples = sampled_points[start : start + window_samples] if remove_dc: windowed_samples -= np.mean(windowed_samples) - fourier_amplitude = np.abs(np.fft.fft(windowed_samples) / (window_samples / 2)) + fourier_amplitude = np.abs( + np.fft.fft(windowed_samples) / (window_samples / 2) + ) fourier_frequencies = np.fft.fftfreq(window_samples, sampling_time_step) stft_results.append( Function( @@ -1080,8 +1084,6 @@ def to_stft(self, lower, upper, sampling_frequency, window_size, step_size, remo ) ) return stft_results - - def low_pass_filter(self, alpha, file_path=None): """Implements a low pass filter with a moving average filter. This does From 5f60c547fcbbfad130ee7b0b47cc90bd8b867398 Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 Date: Thu, 13 Jun 2024 01:07:56 +0530 Subject: [PATCH 09/15] Added feature: Short-Time Fourier Transform function --- rocketpy/mathutils/function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index c9361c85b..5b7f101da 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1055,7 +1055,7 @@ def to_stft( >>> for window in stft_result: ... peak_frequencies_index = np.where(window[:, 1] > 0.001) ... peak_frequencies = window[peak_frequencies_index, 0] - ... print(peak_frequencies) + >>> print(peak_frequencies) [[-10. 10.]] ''' # Get the time domain data From 236552abfbe184900d309dc4d555f6968c533886 Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 <110400437+AdvaitChandorkar07@users.noreply.github.com> Date: Sat, 13 Jul 2024 18:11:06 +0530 Subject: [PATCH 10/15] "Variable name changes in stft" --- rocketpy/mathutils/function.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index 5b7f101da..37bd031dd 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1012,9 +1012,7 @@ def to_frequency_domain(self, lower, upper, sampling_frequency, remove_dc=True): extrapolation="zero", ) - def to_stft( - self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True - ): + def short_time_fft(self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True): ''' Performs the Short-Time Fourier Transform (STFT) of the Function and returns the result. The STFT is computed by applying the Fourier transform @@ -1062,18 +1060,17 @@ def to_stft( sampling_time_step = 1.0 / sampling_frequency sampling_range = np.arange(lower, upper, sampling_time_step) sampled_points = self(sampling_range) - window_samples = int(window_size * sampling_frequency) - step_samples = int(step_size * sampling_frequency) - + samples_per_window= int(window_size * sampling_frequency) + samples_skipped_per_step= int(step_size * sampling_frequency) stft_results = [] - for start in range(0, len(sampled_points) - window_samples + 1, step_samples): - windowed_samples = sampled_points[start : start + window_samples] + for start in range(0, len(sampled_points) - samples_per_window+ 1, samples_skipped_per_step): + windowed_samples = sampled_points[start : start + samples_per_window] if remove_dc: windowed_samples -= np.mean(windowed_samples) fourier_amplitude = np.abs( - np.fft.fft(windowed_samples) / (window_samples / 2) + np.fft.fft(windowed_samples) / (samples_per_window/ 2) ) - fourier_frequencies = np.fft.fftfreq(window_samples, sampling_time_step) + fourier_frequencies = np.fft.fftfreq(samples_per_window, sampling_time_step) stft_results.append( Function( source=np.array([fourier_frequencies, fourier_amplitude]).T, From f141fe24b2c0bb133e4a9354fb7b86cc04b6f37e Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 <110400437+AdvaitChandorkar07@users.noreply.github.com> Date: Sat, 13 Jul 2024 18:17:32 +0530 Subject: [PATCH 11/15] "Variable and function name formatting" --- rocketpy/mathutils/function.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index 37bd031dd..af9ec0cfe 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1012,7 +1012,9 @@ def to_frequency_domain(self, lower, upper, sampling_frequency, remove_dc=True): extrapolation="zero", ) - def short_time_fft(self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True): + def short_time_fft( + self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True + ): ''' Performs the Short-Time Fourier Transform (STFT) of the Function and returns the result. The STFT is computed by applying the Fourier transform @@ -1060,15 +1062,17 @@ def short_time_fft(self, lower, upper, sampling_frequency, window_size, step_siz sampling_time_step = 1.0 / sampling_frequency sampling_range = np.arange(lower, upper, sampling_time_step) sampled_points = self(sampling_range) - samples_per_window= int(window_size * sampling_frequency) - samples_skipped_per_step= int(step_size * sampling_frequency) + samples_per_window = int(window_size * sampling_frequency) + samples_skipped_per_step = int(step_size * sampling_frequency) stft_results = [] - for start in range(0, len(sampled_points) - samples_per_window+ 1, samples_skipped_per_step): + for start in range( + 0, len(sampled_points) - samples_per_window + 1, samples_skipped_per_step + ): windowed_samples = sampled_points[start : start + samples_per_window] if remove_dc: windowed_samples -= np.mean(windowed_samples) fourier_amplitude = np.abs( - np.fft.fft(windowed_samples) / (samples_per_window/ 2) + np.fft.fft(windowed_samples) / (samples_per_window / 2) ) fourier_frequencies = np.fft.fftfreq(samples_per_window, sampling_time_step) stft_results.append( From efc25831bb0e7b08440367ce2a5b6f8e7b9978ae Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 <110400437+AdvaitChandorkar07@users.noreply.github.com> Date: Sun, 14 Jul 2024 17:40:11 +0530 Subject: [PATCH 12/15] "Better Example" --- rocketpy/mathutils/function.py | 37 ++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index af9ec0cfe..b93d49ff4 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1044,19 +1044,34 @@ def short_time_fft( -------- >>> from rocketpy import Function >>> import numpy as np - >>> main_frequency = 10 # Hz - >>> time = np.linspace(0, 10, 1000) - >>> signal = np.sin(2 * np.pi * main_frequency * time) - >>> time_domain = Function(np.array([time, signal]).T) - >>> stft_result = time_domain.to_stft( - ... lower=0, upper=10, sampling_frequency=100, - ... window_size=1, step_size=0.1 + >>> import matplotlib.pyplot as plt + + >>> # Generate a signal with varying frequency + >>> T_x, N = 1 / 20, 1000 # 20 Hz sampling rate for 50 s signal + >>> t_x = np.arange(N) * T_x # time indexes for signal + >>> f_i = 1 * np.arctan((t_x - t_x[N // 2]) / 2) + 5 # varying frequency + >>> signal = np.sin(2 * np.pi * np.cumsum(f_i) * T_x) # the signal + + >>> # Create a Function object + >>> time_domain = Function(np.array([t_x, signal]).T) + + >>> # Perform the STFT + >>> stft_result = time_domain.short_time_fft( + ... lower=0, upper=50, sampling_frequency=20, + ... window_size=2, step_size=0.5 ... ) + + >>> # Plot the result + >>> fig, ax = plt.subplots(figsize=(10, 6)) >>> for window in stft_result: - ... peak_frequencies_index = np.where(window[:, 1] > 0.001) - ... peak_frequencies = window[peak_frequencies_index, 0] - >>> print(peak_frequencies) - [[-10. 10.]] + ... freq = window[:, 0] + ... amplitude = window[:, 1] + ... ax.plot(freq, amplitude) + + >>> ax.set_xlabel('Frequency (Hz)') + >>> ax.set_ylabel('Amplitude') + >>> ax.set_title('Short-Time Fourier Transform (STFT) Result') + >>> plt.show() ''' # Get the time domain data sampling_time_step = 1.0 / sampling_frequency From 03faf61ae4b5ee36ff87a5832de4f8a242f42d88 Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 <110400437+AdvaitChandorkar07@users.noreply.github.com> Date: Thu, 18 Jul 2024 12:21:30 +0530 Subject: [PATCH 13/15] Fixed the doctest --- rocketpy/mathutils/function.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index b93d49ff4..95b6245a4 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1054,7 +1054,6 @@ def short_time_fft( >>> # Create a Function object >>> time_domain = Function(np.array([t_x, signal]).T) - >>> # Perform the STFT >>> stft_result = time_domain.short_time_fft( ... lower=0, upper=50, sampling_frequency=20, @@ -1066,11 +1065,14 @@ def short_time_fft( >>> for window in stft_result: ... freq = window[:, 0] ... amplitude = window[:, 1] - ... ax.plot(freq, amplitude) + ... _=ax.plot(freq, amplitude) >>> ax.set_xlabel('Frequency (Hz)') + Text(0.5, 0, 'Frequency (Hz)') >>> ax.set_ylabel('Amplitude') + Text(0, 0.5, 'Amplitude') >>> ax.set_title('Short-Time Fourier Transform (STFT) Result') + Text(0.5, 1.0, 'Short-Time Fourier Transform (STFT) Result') >>> plt.show() ''' # Get the time domain data From 3c7f548905b00ff7d6372977585d9f8891195a0f Mon Sep 17 00:00:00 2001 From: AdvaitChandorkar07 <110400437+AdvaitChandorkar07@users.noreply.github.com> Date: Thu, 25 Jul 2024 20:46:20 +0530 Subject: [PATCH 14/15] "Spectrogram example" --- python | 0 rocketpy/mathutils/function.py | 72 +++++++++++++++++++++++++--------- 2 files changed, 54 insertions(+), 18 deletions(-) create mode 100644 python diff --git a/python b/python new file mode 100644 index 000000000..e69de29bb diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index 95b6245a4..fc0359601 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1013,8 +1013,14 @@ def to_frequency_domain(self, lower, upper, sampling_frequency, remove_dc=True): ) def short_time_fft( - self, lower, upper, sampling_frequency, window_size, step_size, remove_dc=True - ): + self, + lower, + upper, + sampling_frequency, + window_size, + step_size, + remove_dc=True, + only_positive=True): ''' Performs the Short-Time Fourier Transform (STFT) of the Function and returns the result. The STFT is computed by applying the Fourier transform @@ -1034,6 +1040,8 @@ def short_time_fft( Step size for the window, in seconds. remove_dc : bool, optional If True, the DC component is removed from each window before computing the Fourier transform. + only_positive: bool, optional + If True, only the positive frequencies are returned. Returns ------- @@ -1042,38 +1050,57 @@ def short_time_fft( Examples -------- + Adopted from (1) >>> from rocketpy import Function >>> import numpy as np >>> import matplotlib.pyplot as plt >>> # Generate a signal with varying frequency - >>> T_x, N = 1 / 20, 1000 # 20 Hz sampling rate for 50 s signal + >>> T_x, N = 1 / 20 , 1000 # 20 Hz sampling rate for 50 s signal >>> t_x = np.arange(N) * T_x # time indexes for signal - >>> f_i = 1 * np.arctan((t_x - t_x[N // 2]) / 2) + 5 # varying frequency + >>> f_i = 1 * np.arctan((t_x - t_x[N // 2]) / 2) + 5 # varying frequency >>> signal = np.sin(2 * np.pi * np.cumsum(f_i) * T_x) # the signal >>> # Create a Function object >>> time_domain = Function(np.array([t_x, signal]).T) >>> # Perform the STFT >>> stft_result = time_domain.short_time_fft( - ... lower=0, upper=50, sampling_frequency=20, + ... lower=0, upper=50, sampling_frequency=95, ... window_size=2, step_size=0.5 ... ) - >>> # Plot the result - >>> fig, ax = plt.subplots(figsize=(10, 6)) - >>> for window in stft_result: - ... freq = window[:, 0] - ... amplitude = window[:, 1] - ... _=ax.plot(freq, amplitude) - - >>> ax.set_xlabel('Frequency (Hz)') - Text(0.5, 0, 'Frequency (Hz)') - >>> ax.set_ylabel('Amplitude') - Text(0, 0.5, 'Amplitude') - >>> ax.set_title('Short-Time Fourier Transform (STFT) Result') - Text(0.5, 1.0, 'Short-Time Fourier Transform (STFT) Result') + >>> # Prepare data for plotting + >>> Sx = np.abs([window[:, 1] for window in stft_result]) + >>> t_lo, t_hi = t_x[0], t_x[-1] + >>> extent = [t_lo, t_hi, 0, 50] + >>> # Plot the spectrogram + >>> fig1, ax1 = plt.subplots(figsize=(10, 6)) + >>> im1 = ax1.imshow(Sx.T, origin='lower', aspect='auto', + ... extent=extent, cmap='viridis') + >>> _=ax1.set_title(rf"STFT (2$\,s$ Gaussian window, $\sigma_t=0.4\,$s)") + >>> _=ax1.set(xlabel=f"Time $t$ in seconds", + ... ylabel=f"Freq. $f$ in Hz)", + ... xlim=(t_lo, t_hi)) + >>> _=ax1.plot(t_x, f_i, 'r--', alpha=.5, label='$f_i(t)$') + >>> _=fig1.colorbar(im1, label="Magnitude $|S_x(t, f)|$") + + >>> # Shade areas where window slices stick out to the side + >>> for t0_, t1_ in [(t_lo, 1), + ... (49, t_hi)]: + ... _=ax1.axvspan(t0_, t1_, color='w', linewidth=0, alpha=.2) + + >>> # Mark signal borders with vertical line + >>> for t_ in [t_lo, t_hi]: + ... _=ax1.axvline(t_, color='y', linestyle='--', alpha=0.5) + + >>> # Add legend and finalize plot + >>> _=ax1.legend() + >>> fig1.tight_layout() >>> plt.show() + + References + --------------- + [1] https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.ShortTimeFFT.html ''' # Get the time domain data sampling_time_step = 1.0 / sampling_frequency @@ -1082,6 +1109,7 @@ def short_time_fft( samples_per_window = int(window_size * sampling_frequency) samples_skipped_per_step = int(step_size * sampling_frequency) stft_results = [] + for start in range( 0, len(sampled_points) - samples_per_window + 1, samples_skipped_per_step ): @@ -1092,6 +1120,13 @@ def short_time_fft( np.fft.fft(windowed_samples) / (samples_per_window / 2) ) fourier_frequencies = np.fft.fftfreq(samples_per_window, sampling_time_step) + + # Filter to keep only positive frequencies if specified + if only_positive: + positive_indices = fourier_frequencies > 0 + fourier_frequencies = fourier_frequencies[positive_indices] + fourier_amplitude = fourier_amplitude[positive_indices] + stft_results.append( Function( source=np.array([fourier_frequencies, fourier_amplitude]).T, @@ -1101,6 +1136,7 @@ def short_time_fft( extrapolation="zero", ) ) + return stft_results def low_pass_filter(self, alpha, file_path=None): From 5f235517e21f856a6a1c0d4b7185eed57de9f331 Mon Sep 17 00:00:00 2001 From: Gui-FernandesBR Date: Sat, 17 Aug 2024 22:52:32 -0300 Subject: [PATCH 15/15] small fixes to STFT function --- .vscode/settings.json | 3 ++ CHANGELOG.md | 3 +- python | 0 rocketpy/mathutils/function.py | 87 +++++++++++++++++++--------------- 4 files changed, 53 insertions(+), 40 deletions(-) delete mode 100644 python diff --git a/.vscode/settings.json b/.vscode/settings.json index ac2bb1cab..bc43e427c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -78,6 +78,7 @@ "copybutton", "cstride", "csys", + "cumsum", "datapoints", "datetime", "dcsys", @@ -149,6 +150,7 @@ "IGRA", "imageio", "imread", + "imshow", "intc", "interp", "Interquartile", @@ -258,6 +260,7 @@ "SRTM", "SRTMGL", "Stano", + "STFT", "subintervals", "suptitle", "ticklabel", diff --git a/CHANGELOG.md b/CHANGELOG.md index dced5f94c..0f1828872 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,7 +32,8 @@ Attention: The newest changes should be on top --> ### Added -- ENH: Rocket Axis Definition [#635](https://github.com/RocketPy-Team/RocketPy/pull/635) +- ENH: Add STFT function to Function class [#620](https://github.com/RocketPy-Team/RocketPy/pull/620) +- ENH: Rocket Axis Definition [#635](https://github.com/RocketPy-Team/RocketPy/pull/635) ### Changed diff --git a/python b/python deleted file mode 100644 index e69de29bb..000000000 diff --git a/rocketpy/mathutils/function.py b/rocketpy/mathutils/function.py index 1c559a308..79680411c 100644 --- a/rocketpy/mathutils/function.py +++ b/rocketpy/mathutils/function.py @@ -1014,11 +1014,12 @@ def short_time_fft( window_size, step_size, remove_dc=True, - only_positive=True): - ''' + only_positive=True, + ): + r""" Performs the Short-Time Fourier Transform (STFT) of the Function and - returns the result. The STFT is computed by applying the Fourier transform - to overlapping windows of the Function. + returns the result. The STFT is computed by applying the Fourier + transform to overlapping windows of the Function. Parameters ---------- @@ -1033,69 +1034,77 @@ def short_time_fft( step_size : float Step size for the window, in seconds. remove_dc : bool, optional - If True, the DC component is removed from each window before computing the Fourier transform. + If True, the DC component is removed from each window before + computing the Fourier transform. only_positive: bool, optional If True, only the positive frequencies are returned. Returns ------- - list of Function + list[Function] A list of Functions, each representing the STFT of a window. Examples -------- - Adopted from (1) - >>> from rocketpy import Function + >>> import numpy as np >>> import matplotlib.pyplot as plt + >>> from rocketpy import Function + + Generate a signal with varying frequency: - >>> # Generate a signal with varying frequency >>> T_x, N = 1 / 20 , 1000 # 20 Hz sampling rate for 50 s signal >>> t_x = np.arange(N) * T_x # time indexes for signal >>> f_i = 1 * np.arctan((t_x - t_x[N // 2]) / 2) + 5 # varying frequency >>> signal = np.sin(2 * np.pi * np.cumsum(f_i) * T_x) # the signal - >>> # Create a Function object + Create the Function object and perform the STFT: + >>> time_domain = Function(np.array([t_x, signal]).T) - >>> # Perform the STFT >>> stft_result = time_domain.short_time_fft( - ... lower=0, upper=50, sampling_frequency=95, - ... window_size=2, step_size=0.5 + ... lower=0, + ... upper=50, + ... sampling_frequency=95, + ... window_size=2, + ... step_size=0.5, ... ) - >>> # Prepare data for plotting + Plot the spectrogram: + >>> Sx = np.abs([window[:, 1] for window in stft_result]) >>> t_lo, t_hi = t_x[0], t_x[-1] - >>> extent = [t_lo, t_hi, 0, 50] - >>> # Plot the spectrogram >>> fig1, ax1 = plt.subplots(figsize=(10, 6)) - >>> im1 = ax1.imshow(Sx.T, origin='lower', aspect='auto', - ... extent=extent, cmap='viridis') - >>> _=ax1.set_title(rf"STFT (2$\,s$ Gaussian window, $\sigma_t=0.4\,$s)") - >>> _=ax1.set(xlabel=f"Time $t$ in seconds", - ... ylabel=f"Freq. $f$ in Hz)", - ... xlim=(t_lo, t_hi)) - >>> _=ax1.plot(t_x, f_i, 'r--', alpha=.5, label='$f_i(t)$') - >>> _=fig1.colorbar(im1, label="Magnitude $|S_x(t, f)|$") - + >>> im1 = ax1.imshow( + ... Sx.T, + ... origin='lower', + ... aspect='auto', + ... extent=[t_lo, t_hi, 0, 50], + ... cmap='viridis' + ... ) + >>> _ = ax1.set_title(rf"STFT (2$\,s$ Gaussian window, $\sigma_t=0.4\,$s)") + >>> _ = ax1.set( + ... xlabel=f"Time $t$ in seconds", + ... ylabel=f"Freq. $f$ in Hz)", + ... xlim=(t_lo, t_hi) + ... ) + >>> _ = ax1.plot(t_x, f_i, 'r--', alpha=.5, label='$f_i(t)$') + >>> _ = fig1.colorbar(im1, label="Magnitude $|S_x(t, f)|$") >>> # Shade areas where window slices stick out to the side - >>> for t0_, t1_ in [(t_lo, 1), - ... (49, t_hi)]: - ... _=ax1.axvspan(t0_, t1_, color='w', linewidth=0, alpha=.2) - + >>> for t0_, t1_ in [(t_lo, 1), (49, t_hi)]: + ... _ = ax1.axvspan(t0_, t1_, color='w', linewidth=0, alpha=.2) >>> # Mark signal borders with vertical line >>> for t_ in [t_lo, t_hi]: - ... _=ax1.axvline(t_, color='y', linestyle='--', alpha=0.5) - + ... _ = ax1.axvline(t_, color='y', linestyle='--', alpha=0.5) >>> # Add legend and finalize plot - >>> _=ax1.legend() + >>> _ = ax1.legend() >>> fig1.tight_layout() - >>> plt.show() + >>> # plt.show() # uncomment to show the plot References - --------------- - [1] https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.ShortTimeFFT.html - ''' + ---------- + Example adapted from the SciPy documentation: + https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.ShortTimeFFT.html + """ # Get the time domain data sampling_time_step = 1.0 / sampling_frequency sampling_range = np.arange(lower, upper, sampling_time_step) @@ -1104,9 +1113,9 @@ def short_time_fft( samples_skipped_per_step = int(step_size * sampling_frequency) stft_results = [] - for start in range( - 0, len(sampled_points) - samples_per_window + 1, samples_skipped_per_step - ): + max_start = len(sampled_points) - samples_per_window + 1 + + for start in range(0, max_start, samples_skipped_per_step): windowed_samples = sampled_points[start : start + samples_per_window] if remove_dc: windowed_samples -= np.mean(windowed_samples)