From fe9163fc188dcc955ca6f0202b8d9daaf7d207df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Tue, 2 Mar 2021 21:50:26 +0100 Subject: [PATCH 1/7] implement dynamic window sizes --- fowd/constants.py | 2 +- fowd/operators.py | 37 +++++++++++++++++++++++++++++++++++++ fowd/processing.py | 25 +++++++++++++++++++++++-- 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/fowd/constants.py b/fowd/constants.py index a5b779d..1093ca5 100644 --- a/fowd/constants.py +++ b/fowd/constants.py @@ -23,7 +23,7 @@ ) # sea state aggregation periods (in minutes) -SEA_STATE_INTERVALS = (10, 30) +SEA_STATE_INTERVALS = (10, 30, 'dynamic') # gravitational acceleration in m/s^2 GRAVITY = 9.81 diff --git a/fowd/operators.py b/fowd/operators.py index 9dfd52f..ad6d3ac 100644 --- a/fowd/operators.py +++ b/fowd/operators.py @@ -56,6 +56,43 @@ def find_wave_indices(z, start_idx=0): active = True +def compute_dynamic_window_size(t, elevation, min_length, max_length, num_windows=10): + best_window_length = None + best_window_stationarity = float('inf') + + if isinstance(min_length, np.timedelta64): + min_length = min_length / np.timedelta64(1, 's') + + if isinstance(max_length, np.timedelta64): + max_length = max_length / np.timedelta64(1, 's') + + for window_length in np.linspace(min_length, max_length, num_windows, dtype='int'): + window_stationarity = [] + for current_time_idx in np.linspace(0, window_length, 10, endpoint=False, dtype='int'): + if current_time_idx >= len(t): + continue + + window_stds = [] + while True: + next_time = t[current_time_idx] + np.timedelta64(window_length, 's') + if next_time > t[-1]: + break + next_time_idx = get_time_index(next_time, t) + window_stds.append(np.nanstd(elevation[current_time_idx:next_time_idx], ddof=1.5)) + current_time_idx = next_time_idx + + window_stds = np.asarray(window_stds) + window_stationarity.append(np.nanstd(window_stds[1:] / window_stds[:-1] - 1, ddof=1)) + + mean_window_stationarity = np.nanmean(window_stationarity) + + if mean_window_stationarity < best_window_stationarity: + best_window_length = window_length + best_window_stationarity = mean_window_stationarity + + return best_window_length + + def add_prefix(dic, prefix): """Adds a prefix to every key in given dictionary.""" return {f'{prefix}_{key}': value for key, value in dic.items()} diff --git a/fowd/processing.py b/fowd/processing.py index 68720e3..c9dbacc 100644 --- a/fowd/processing.py +++ b/fowd/processing.py @@ -25,7 +25,7 @@ from .operators import ( find_wave_indices, add_prefix, get_time_index, get_md5_hash, get_proc_version, get_station_meta, get_wave_parameters, get_sea_parameters, get_directional_parameters, - check_quality_flags, compute_significant_wave_height + check_quality_flags, compute_significant_wave_height, compute_dynamic_window_size, ) logger = logging.getLogger(__name__) @@ -264,6 +264,8 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): postfix=dict(waves_processed=str(local_wave_id)), initial=start_idx ) + dynamic_sea_state_period = None + with strict_filelock(outfile), tqdm.tqdm(**pbar_kwargs) as pbar: # initialize output files handle_output(wave_records, wave_params_history, num_flags_fired) @@ -344,7 +346,23 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): # compute sea state parameters for sea_state_period in SEA_STATE_INTERVALS: - offset = np.timedelta64(sea_state_period, 'm') + if sea_state_period == 'dynamic': + if local_wave_id % 1000 == 0 or dynamic_sea_state_period is None: + dynamic_window_idx = slice( + get_time_index(wave_params['start_time'] - np.timedelta64(12, 'h'), time), + wave_start, + ) + dynamic_sea_state_period = compute_dynamic_window_size( + time[dynamic_window_idx], + elevation[dynamic_window_idx], + min_length=np.timedelta64(10, 'm'), + max_length=np.timedelta64(60, 'm'), + num_windows=11, + ) + this_wave_records['sea_state_dynamic_window_length'] = dynamic_sea_state_period + offset = np.timedelta64(dynamic_sea_state_period, 's') + else: + offset = np.timedelta64(sea_state_period, 'm') sea_state_idx = slice( get_time_index(wave_params['start_time'] - offset, time), @@ -404,6 +422,9 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): wave_records.clear() pbar.set_postfix(dict(waves_processed=str(local_wave_id))) + if local_wave_id > 10_000: + break + else: # all waves processed pbar.update(len(elevation) - last_wave_stop) From 3b558c8d301ff72230fb136923de8463c1e6b099 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Wed, 3 Mar 2021 13:32:20 +0100 Subject: [PATCH 2/7] :bug: --- fowd/operators.py | 7 ++++-- fowd/output.py | 57 +++++++++++++++++++++++++++++----------------- fowd/processing.py | 11 +++++---- 3 files changed, 48 insertions(+), 27 deletions(-) diff --git a/fowd/operators.py b/fowd/operators.py index ad6d3ac..5ff31cf 100644 --- a/fowd/operators.py +++ b/fowd/operators.py @@ -61,10 +61,10 @@ def compute_dynamic_window_size(t, elevation, min_length, max_length, num_window best_window_stationarity = float('inf') if isinstance(min_length, np.timedelta64): - min_length = min_length / np.timedelta64(1, 's') + min_length = int(min_length / np.timedelta64(1, 's')) if isinstance(max_length, np.timedelta64): - max_length = max_length / np.timedelta64(1, 's') + max_length = int(max_length / np.timedelta64(1, 's')) for window_length in np.linspace(min_length, max_length, num_windows, dtype='int'): window_stationarity = [] @@ -90,6 +90,9 @@ def compute_dynamic_window_size(t, elevation, min_length, max_length, num_window best_window_length = window_length best_window_stationarity = mean_window_stationarity + if best_window_length is None: + return min_length + return best_window_length diff --git a/fowd/output.py b/fowd/output.py index dfd1158..093c285 100644 --- a/fowd/output.py +++ b/fowd/output.py @@ -181,8 +181,23 @@ # sea state parameter metadata for interval in SEA_STATE_INTERVALS: + if not isinstance(interval, str): + interval = f'{interval}m' + + if interval == 'dynamic': + DATASET_VARIABLES.update({ + 'sea_state_dynamic_window_length': dict( + dims=('wave_id_local',), + dtype='int64', + attrs=dict( + long_name='Length of dynamically computed sea state window', + units='seconds', + ) + ) + }) + DATASET_VARIABLES.update({ - f'sea_state_{interval}m_start_time': dict( + f'sea_state_{interval}_start_time': dict( dims=('wave_id_local',), dtype='int64', attrs=dict( @@ -190,7 +205,7 @@ units=f'milliseconds since {TIME_ORIGIN}', ) ), - f'sea_state_{interval}m_end_time': dict( + f'sea_state_{interval}_end_time': dict( dims=('wave_id_local',), dtype='int64', attrs=dict( @@ -198,7 +213,7 @@ units=f'milliseconds since {TIME_ORIGIN}', ) ), - f'sea_state_{interval}m_significant_wave_height_spectral': dict( + f'sea_state_{interval}_significant_wave_height_spectral': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -206,7 +221,7 @@ units='meters', ) ), - f'sea_state_{interval}m_significant_wave_height_direct': dict( + f'sea_state_{interval}_significant_wave_height_direct': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -214,7 +229,7 @@ units='meters', ) ), - f'sea_state_{interval}m_maximum_wave_height': dict( + f'sea_state_{interval}_maximum_wave_height': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -222,7 +237,7 @@ units='meters', ) ), - f'sea_state_{interval}m_rel_maximum_wave_height': dict( + f'sea_state_{interval}_rel_maximum_wave_height': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -233,7 +248,7 @@ units='1', ) ), - f'sea_state_{interval}m_mean_period_direct': dict( + f'sea_state_{interval}_mean_period_direct': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -241,7 +256,7 @@ units='seconds', ) ), - f'sea_state_{interval}m_mean_period_spectral': dict( + f'sea_state_{interval}_mean_period_spectral': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -249,7 +264,7 @@ units='seconds', ) ), - f'sea_state_{interval}m_skewness': dict( + f'sea_state_{interval}_skewness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -257,7 +272,7 @@ units='1', ) ), - f'sea_state_{interval}m_kurtosis': dict( + f'sea_state_{interval}_kurtosis': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -265,7 +280,7 @@ units='1', ) ), - f'sea_state_{interval}m_valid_data_ratio': dict( + f'sea_state_{interval}_valid_data_ratio': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -275,7 +290,7 @@ units='1', ) ), - f'sea_state_{interval}m_peak_wave_period': dict( + f'sea_state_{interval}_peak_wave_period': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -283,7 +298,7 @@ units='seconds', ) ), - f'sea_state_{interval}m_peak_wavelength': dict( + f'sea_state_{interval}_peak_wavelength': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -291,7 +306,7 @@ units='meters', ) ), - f'sea_state_{interval}m_steepness': dict( + f'sea_state_{interval}_steepness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -299,7 +314,7 @@ units='1', ) ), - f'sea_state_{interval}m_bandwidth_peakedness': dict( + f'sea_state_{interval}_bandwidth_peakedness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -310,7 +325,7 @@ units='1', ) ), - f'sea_state_{interval}m_bandwidth_narrowness': dict( + f'sea_state_{interval}_bandwidth_narrowness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -318,7 +333,7 @@ units='1', ) ), - f'sea_state_{interval}m_benjamin_feir_index_peakedness': dict( + f'sea_state_{interval}_benjamin_feir_index_peakedness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -326,7 +341,7 @@ units='1', ) ), - f'sea_state_{interval}m_benjamin_feir_index_narrowness': dict( + f'sea_state_{interval}_benjamin_feir_index_narrowness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -334,7 +349,7 @@ units='1', ) ), - f'sea_state_{interval}m_crest_trough_correlation': dict( + f'sea_state_{interval}_crest_trough_correlation': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -344,7 +359,7 @@ valid_max=1, ) ), - f'sea_state_{interval}m_energy_in_frequency_interval': dict( + f'sea_state_{interval}_energy_in_frequency_interval': dict( dims=('wave_id_local', 'meta_frequency_band'), dtype='float32', attrs=dict( @@ -352,7 +367,7 @@ units='watts', ) ), - f'sea_state_{interval}m_rel_energy_in_frequency_interval': dict( + f'sea_state_{interval}_rel_energy_in_frequency_interval': dict( dims=('wave_id_local', 'meta_frequency_band'), dtype='float32', attrs=dict( diff --git a/fowd/processing.py b/fowd/processing.py index c9dbacc..8affa39 100644 --- a/fowd/processing.py +++ b/fowd/processing.py @@ -359,6 +359,7 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): max_length=np.timedelta64(60, 'm'), num_windows=11, ) + this_wave_records['sea_state_dynamic_window_length'] = dynamic_sea_state_period offset = np.timedelta64(dynamic_sea_state_period, 's') else: @@ -385,8 +386,13 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): water_depth ) + if isinstance(sea_state_period, str): + sea_state_prefix = f'sea_state_{sea_state_period}' + else: + sea_state_prefix = f'sea_state_{sea_state_period}m' + this_wave_records.update( - add_prefix(sea_state_params, f'sea_state_{sea_state_period}m') + add_prefix(sea_state_params, sea_state_prefix) ) # compute directional quantities @@ -422,9 +428,6 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): wave_records.clear() pbar.set_postfix(dict(waves_processed=str(local_wave_id))) - if local_wave_id > 10_000: - break - else: # all waves processed pbar.update(len(elevation) - last_wave_stop) From 71f250a82851fbc7a6d69e3c34380662d46aca07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Thu, 4 Mar 2021 08:34:36 +0100 Subject: [PATCH 3/7] implement directionality parameter --- fowd/operators.py | 41 +++++++++++++++++++++++++++++++++++++++- fowd/processing.py | 9 +++++---- fowd/sanity/testcases.py | 7 +++++-- 3 files changed, 50 insertions(+), 7 deletions(-) diff --git a/fowd/operators.py b/fowd/operators.py index 5ff31cf..13e8da1 100644 --- a/fowd/operators.py +++ b/fowd/operators.py @@ -57,6 +57,16 @@ def find_wave_indices(z, start_idx=0): def compute_dynamic_window_size(t, elevation, min_length, max_length, num_windows=10): + """Compute best window size from data by minimizing fluctuations around mean energy drift. + + Reference: + + Fedele, Francesco, et al. “Large Nearshore Storm Waves off the Irish Coast.” + Scientific Reports, vol. 9, no. 1, 1, Nature Publishing Group, Oct. 2019, p. 15406. + www.nature.com, doi:10.1038/s41598-019-51706-8. + + """ + # TODO: introduce constants for some of these choices best_window_length = None best_window_stationarity = float('inf') @@ -185,7 +195,8 @@ def compute_ursell_number(wave_height, wavelength, water_depth): def integrate(y, x): """Computes numerical integral ∫ y(x) dx.""" - return np.trapz(y, x) + mask = np.isfinite(y) & np.isfinite(x) + return np.trapz(y[mask], x[mask]) def compute_elevation(displacement): @@ -461,6 +472,24 @@ def compute_dominant_direction(frequencies, direction, energy_density): return circular_weighted_average(frequencies, direction, weights=energy_density) +def compute_directionality_index(frequencies, spread, spectral_bandwidth, energy_density): + """Compute directionality index R. + + Reference: + + Fedele, Francesco, et al. “Large Nearshore Storm Waves off the Irish Coast.” + Scientific Reports, vol. 9, no. 1, 1, Nature Publishing Group, Oct. 2019, p. 15406. + www.nature.com, doi:10.1038/s41598-019-51706-8. + + """ + total_spread = ( + integrate(spread * energy_density, frequencies) + / integrate(energy_density, frequencies) + ) + total_spread_rad = np.radians(total_spread) + return total_spread_rad ** 2 / (2 * spectral_bandwidth ** 2) + + # QC def check_flag_a(zero_crossing_periods, threshold=QC_FLAG_A_THRESHOLD): @@ -726,9 +755,19 @@ def get_directional_parameters(time, frequencies, directional_spread, mean_direc assert len(dominant_directional_spreads) == len(FREQUENCY_INTERVALS) assert len(dominant_directions) == len(FREQUENCY_INTERVALS) + zeroth_moment = compute_nth_moment(frequencies, wave_spectral_density, 0) + first_moment = compute_nth_moment(frequencies, wave_spectral_density, 1) + spectral_bandwidth = compute_bandwidth_narrowness( + zeroth_moment, first_moment, frequencies, wave_spectral_density + ) + directionality_index = compute_directionality_index( + frequencies, directional_spread, spectral_bandwidth, wave_spectral_density + ) + return { 'sampling_time': time, 'dominant_spread_in_frequency_interval': dominant_directional_spreads, 'dominant_direction_in_frequency_interval': dominant_directions, 'peak_wave_direction': peak_wave_direction, + 'directionality_index': directionality_index, } diff --git a/fowd/processing.py b/fowd/processing.py index 8affa39..4cc2edf 100644 --- a/fowd/processing.py +++ b/fowd/processing.py @@ -348,10 +348,11 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): for sea_state_period in SEA_STATE_INTERVALS: if sea_state_period == 'dynamic': if local_wave_id % 1000 == 0 or dynamic_sea_state_period is None: - dynamic_window_idx = slice( - get_time_index(wave_params['start_time'] - np.timedelta64(12, 'h'), time), - wave_start, + dynamic_window_start = get_time_index( + wave_params['start_time'] - np.timedelta64(12, 'h'), + time ) + dynamic_window_idx = slice(dynamic_window_start, wave_start) dynamic_sea_state_period = compute_dynamic_window_size( time[dynamic_window_idx], elevation[dynamic_window_idx], @@ -409,7 +410,7 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): direction_args['direction_spread'][directional_time_idx], direction_args['direction_mean_direction'][directional_time_idx], direction_args['direction_energy_density'][directional_time_idx], - direction_args['direction_peak_direction'][directional_time_idx] + direction_args['direction_peak_direction'][directional_time_idx], ) last_directional_time_idx = directional_time_idx diff --git a/fowd/sanity/testcases.py b/fowd/sanity/testcases.py index 63149e4..85bb141 100644 --- a/fowd/sanity/testcases.py +++ b/fowd/sanity/testcases.py @@ -52,8 +52,8 @@ def single_group_test_case(t): def directional_test_case(spectral_params): f = np.linspace(1e-3, 1, 1000) spectrum = ochi_hubble_spectrum(f, **spectral_params) - directional_spread = np.linspace(0, 360 * 4, len(f)) % 360 - mean_direction = np.linspace(360 * 4, 0, len(f)) % 360 + directional_spread = np.linspace(0, 180, len(f)) + mean_direction = np.linspace(360, 0, len(f)) return dict( time=None, @@ -151,5 +151,8 @@ def ochi_component(f, swh, peak_period, shape): DIRECTIONAL_TEST_CASES = { 'swell_dominated_directional': directional_test_case( ochi_params['swell_dominated'], + ), + 'wind_dominated_directional': directional_test_case( + ochi_params['wind_dominated'], ) } From 14d249d75ec4fb1e2711d3326dca3939c13453f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Thu, 4 Mar 2021 14:36:57 +0100 Subject: [PATCH 4/7] square spread before averaging --- fowd/operators.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/fowd/operators.py b/fowd/operators.py index 13e8da1..dd85c76 100644 --- a/fowd/operators.py +++ b/fowd/operators.py @@ -482,12 +482,11 @@ def compute_directionality_index(frequencies, spread, spectral_bandwidth, energy www.nature.com, doi:10.1038/s41598-019-51706-8. """ - total_spread = ( - integrate(spread * energy_density, frequencies) + total_squared_spread = ( + integrate(np.radians(spread) ** 2 * energy_density, frequencies) / integrate(energy_density, frequencies) ) - total_spread_rad = np.radians(total_spread) - return total_spread_rad ** 2 / (2 * spectral_bandwidth ** 2) + return total_squared_spread / (2 * spectral_bandwidth ** 2) # QC From 63679e26cb5bdbe2856cfbc4a01c1ec73a6aa748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Fri, 5 Mar 2021 15:51:19 +0100 Subject: [PATCH 5/7] add directionality index to output --- fowd/output.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/fowd/output.py b/fowd/output.py index 093c285..c4ec034 100644 --- a/fowd/output.py +++ b/fowd/output.py @@ -419,6 +419,18 @@ valid_max=360 ) ), + direction_directionality_index=dict( + dims=('wave_id_local',), + dtype='float32', + attrs=dict( + long_name=( + 'Directionality index R (squared ratio of directional spread and ' + 'spectral bandwidth)' + ), + units='1', + valid_min=0, + ) + ) ) From 160476e2cd5db51392e9e0cbf85cb871f4b22550 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Thu, 11 Mar 2021 18:03:25 +0100 Subject: [PATCH 6/7] clean up dynamic window computation --- fowd/constants.py | 33 ++++++++++++---- fowd/operators.py | 18 +++++---- fowd/output.py | 2 +- fowd/processing.py | 98 +++++++++++++++++++++++++++++----------------- 4 files changed, 101 insertions(+), 50 deletions(-) diff --git a/fowd/constants.py b/fowd/constants.py index 1093ca5..862095f 100644 --- a/fowd/constants.py +++ b/fowd/constants.py @@ -25,6 +25,18 @@ # sea state aggregation periods (in minutes) SEA_STATE_INTERVALS = (10, 30, 'dynamic') +# settings for dynamic sea state aggregation period +# the shortest and longest permitted dynamic window sizes (in minutes) +DYNAMIC_WINDOW_LENGTH_BOUNDS = (10, 60) +# time in minutes after which the dynamic window size is re-computed +DYNAMIC_WINDOW_UPDATE_FREQUENCY = 30 +# dynamic window calculation is based on this many minutes before and after current wave +DYNAMIC_WINDOW_REFERENCE_PERIOD = (12 * 60, 0) +# total number of different window lengths to try within length bounds +NUM_DYNAMIC_WINDOWS = 11 +# number of times each window is re-evaluated for better statistics +NUM_DYNAMIC_WINDOW_SAMPLES = 10 + # gravitational acceleration in m/s^2 GRAVITY = 9.81 @@ -32,13 +44,20 @@ DENSITY = 1024 # QC thresholds -QC_FLAG_A_THRESHOLD = 25 # maximum allowed zero-crossing period (in seconds) -QC_FLAG_B_THRESHOLD = 2 # maximum allowed multiple of limit rate of change -QC_FLAG_C_THRESHOLD = 10 # maximum allowed number of consecutive identical values -QC_FLAG_D_THRESHOLD = 8 # maximum allowed exceedance of surface elevation MADN -QC_FLAG_E_THRESHOLD = 2 # maximum float precision to which sampling rate has to be uniform -QC_FLAG_F_THRESHOLD = 0.05 # maximum allowed ratio of invalid data -QC_FLAG_G_THRESHOLD = 100 # minimum number of zero-crossing periods in wave history +# maximum allowed zero-crossing period (in seconds) +QC_FLAG_A_THRESHOLD = 25 +# maximum allowed multiple of limit rate of change +QC_FLAG_B_THRESHOLD = 2 +# maximum allowed number of consecutive identical values +QC_FLAG_C_THRESHOLD = 10 +# maximum allowed exceedance of surface elevation MADN +QC_FLAG_D_THRESHOLD = 8 +# maximum float precision to which sampling rate has to be uniform +QC_FLAG_E_THRESHOLD = 2 +# maximum allowed ratio of invalid data +QC_FLAG_F_THRESHOLD = 0.05 +# minimum number of zero-crossing periods in wave history +QC_FLAG_G_THRESHOLD = 100 # QC logging options # wave height (in units of sig. wave height) for which failed QC is logged diff --git a/fowd/operators.py b/fowd/operators.py index dd85c76..1c2a406 100644 --- a/fowd/operators.py +++ b/fowd/operators.py @@ -56,7 +56,7 @@ def find_wave_indices(z, start_idx=0): active = True -def compute_dynamic_window_size(t, elevation, min_length, max_length, num_windows=10): +def compute_dynamic_window_size(t, elevation, min_length, max_length, num_windows, num_samples): """Compute best window size from data by minimizing fluctuations around mean energy drift. Reference: @@ -66,7 +66,6 @@ def compute_dynamic_window_size(t, elevation, min_length, max_length, num_window www.nature.com, doi:10.1038/s41598-019-51706-8. """ - # TODO: introduce constants for some of these choices best_window_length = None best_window_stationarity = float('inf') @@ -76,17 +75,23 @@ def compute_dynamic_window_size(t, elevation, min_length, max_length, num_window if isinstance(max_length, np.timedelta64): max_length = int(max_length / np.timedelta64(1, 's')) - for window_length in np.linspace(min_length, max_length, num_windows, dtype='int'): + window_lengths = np.linspace(min_length, max_length, num_windows, dtype='int') + + for window_length in window_lengths: window_stationarity = [] - for current_time_idx in np.linspace(0, window_length, 10, endpoint=False, dtype='int'): - if current_time_idx >= len(t): + window_offsets = np.linspace(0, window_length, num_samples, endpoint=False, dtype='int') + + for window_offset in window_offsets: + if window_offset >= len(t): continue + current_time_idx = window_offset window_stds = [] while True: next_time = t[current_time_idx] + np.timedelta64(window_length, 's') if next_time > t[-1]: break + next_time_idx = get_time_index(next_time, t) window_stds.append(np.nanstd(elevation[current_time_idx:next_time_idx], ddof=1.5)) current_time_idx = next_time_idx @@ -260,9 +265,8 @@ def compute_spectral_density(elevation, sample_dt): nperseg = round(SPECTRUM_WINDOW_SIZE / sample_dt) nfft = 2 ** (math.ceil(math.log(nperseg, 2))) # round to next higher power of 2 return scipy.signal.welch( - elevation, 1 / sample_dt, + elevation, 1 / sample_dt, window='hann', nperseg=nperseg, nfft=nfft, noverlap=nperseg // 2, - window='hann', detrend=False # data is already detrended ) diff --git a/fowd/output.py b/fowd/output.py index c4ec034..7cc4ff2 100644 --- a/fowd/output.py +++ b/fowd/output.py @@ -191,7 +191,7 @@ dtype='int64', attrs=dict( long_name='Length of dynamically computed sea state window', - units='seconds', + units='minutes', ) ) }) diff --git a/fowd/processing.py b/fowd/processing.py index 4cc2edf..aa75b2a 100644 --- a/fowd/processing.py +++ b/fowd/processing.py @@ -19,7 +19,10 @@ from .constants import ( WAVE_HISTORY_LENGTH, SEA_STATE_INTERVALS, - QC_FAIL_LOG_THRESHOLD, QC_EXTREME_WAVE_LOG_THRESHOLD + QC_FAIL_LOG_THRESHOLD, QC_EXTREME_WAVE_LOG_THRESHOLD, + DYNAMIC_WINDOW_LENGTH_BOUNDS, DYNAMIC_WINDOW_UPDATE_FREQUENCY, + DYNAMIC_WINDOW_REFERENCE_PERIOD, NUM_DYNAMIC_WINDOWS, + NUM_DYNAMIC_WINDOW_SAMPLES ) from .operators import ( @@ -112,6 +115,30 @@ def is_same_version(version1, version2): return split_v1[:2] == split_v2[:2] +@contextlib.contextmanager +def strict_filelock(target_file): + """File-based lock that throws an exception if lock is already in place.""" + lockfile = f'{target_file}.lock' + if os.path.isfile(lockfile): + raise RuntimeError( + f'File {target_file} appears to be locked. ' + 'Make sure that no other process is accessing it, then remove manually.' + ) + + with open(lockfile, 'w'): + pass + + try: + yield + finally: + try: + os.remove(lockfile) + except OSError: + warnings.warn(f'Could not remove lock file {lockfile}') + + +# main functions + def initialize_processing(record_file, state_file, input_hash): """Parse temporary files to re-start processing if possible.""" default_params = dict( @@ -169,26 +196,37 @@ def initialize_processing(record_file, state_file, input_hash): ) -@contextlib.contextmanager -def strict_filelock(target_file): - """File-based lock that throws an exception if lock is already in place.""" - lockfile = f'{target_file}.lock' - if os.path.isfile(lockfile): - raise RuntimeError( - f'File {target_file} appears to be locked. ' - 'Make sure that no other process is accessing it, then remove manually.' - ) +def get_dynamic_window_size(current_window_size, current_time, time, elevation, last_updated=None): + skip_update = ( + last_updated is not None + and current_time - last_updated < np.timedelta64(DYNAMIC_WINDOW_UPDATE_FREQUENCY, 'm') + ) - with open(lockfile, 'w'): - pass + if skip_update: + return current_window_size, last_updated - try: - yield - finally: - try: - os.remove(lockfile) - except OSError: - warnings.warn(f'Could not remove lock file {lockfile}') + print("not skipping") + + ref_period_start, ref_period_end = DYNAMIC_WINDOW_REFERENCE_PERIOD + dynamic_window_start = get_time_index( + current_time - np.timedelta64(ref_period_start, 'm'), + time, + ) + dynamic_window_end = get_time_index( + current_time + np.timedelta64(ref_period_end, 'm'), + time, + ) + dynamic_window_idx = slice(dynamic_window_start, dynamic_window_end) + dynamic_sea_state_period = compute_dynamic_window_size( + time[dynamic_window_idx], + elevation[dynamic_window_idx], + min_length=np.timedelta64(DYNAMIC_WINDOW_LENGTH_BOUNDS[0], 'm'), + max_length=np.timedelta64(DYNAMIC_WINDOW_LENGTH_BOUNDS[1], 'm'), + num_windows=NUM_DYNAMIC_WINDOWS, + num_samples=NUM_DYNAMIC_WINDOW_SAMPLES, + ) + + return dynamic_sea_state_period // 60, current_time def compute_wave_records(time, elevation, elevation_normalized, outfile, statefile, @@ -264,7 +302,7 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): postfix=dict(waves_processed=str(local_wave_id)), initial=start_idx ) - dynamic_sea_state_period = None + dynamic_sea_state_period = dynamic_period_last_update = None with strict_filelock(outfile), tqdm.tqdm(**pbar_kwargs) as pbar: # initialize output files @@ -347,22 +385,12 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): # compute sea state parameters for sea_state_period in SEA_STATE_INTERVALS: if sea_state_period == 'dynamic': - if local_wave_id % 1000 == 0 or dynamic_sea_state_period is None: - dynamic_window_start = get_time_index( - wave_params['start_time'] - np.timedelta64(12, 'h'), - time - ) - dynamic_window_idx = slice(dynamic_window_start, wave_start) - dynamic_sea_state_period = compute_dynamic_window_size( - time[dynamic_window_idx], - elevation[dynamic_window_idx], - min_length=np.timedelta64(10, 'm'), - max_length=np.timedelta64(60, 'm'), - num_windows=11, - ) - + dynamic_sea_state_period, dynamic_period_last_update = get_dynamic_window_size( + dynamic_sea_state_period, wave_params["start_time"], time, elevation, + last_updated=dynamic_period_last_update + ) this_wave_records['sea_state_dynamic_window_length'] = dynamic_sea_state_period - offset = np.timedelta64(dynamic_sea_state_period, 's') + offset = np.timedelta64(dynamic_sea_state_period, 'm') else: offset = np.timedelta64(sea_state_period, 'm') From e83e2c6d5092452aa56da3574751a60525b07e6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dion=20H=C3=A4fner?= Date: Fri, 12 Mar 2021 23:03:13 +0100 Subject: [PATCH 7/7] spectral mean period is zero-crossing period --- fowd/operators.py | 2 +- fowd/output.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fowd/operators.py b/fowd/operators.py index 1c2a406..6cb181e 100644 --- a/fowd/operators.py +++ b/fowd/operators.py @@ -352,7 +352,7 @@ def compute_nth_moment(frequencies, wave_spectral_density, n): def compute_mean_wave_period_spectral(zeroth_moment, frequencies, wave_spectral_density): - """Compute mean wave period from wave spectral density.""" + """Compute mean zero-crossing period from wave spectral density.""" second_moment = compute_nth_moment(frequencies, wave_spectral_density, 2) return np.sqrt(zeroth_moment / second_moment) diff --git a/fowd/output.py b/fowd/output.py index 7cc4ff2..ec222ab 100644 --- a/fowd/output.py +++ b/fowd/output.py @@ -260,7 +260,7 @@ dims=('wave_id_local',), dtype='float32', attrs=dict( - long_name='Mean wave period estimated from spectral density spectrum', + long_name='Mean zero-crossing period estimated from spectral density spectrum', units='seconds', ) ),