diff --git a/fowd/constants.py b/fowd/constants.py index a5b779d..862095f 100644 --- a/fowd/constants.py +++ b/fowd/constants.py @@ -23,7 +23,19 @@ ) # sea state aggregation periods (in minutes) -SEA_STATE_INTERVALS = (10, 30) +SEA_STATE_INTERVALS = (10, 30, 'dynamic') + +# settings for dynamic sea state aggregation period +# the shortest and longest permitted dynamic window sizes (in minutes) +DYNAMIC_WINDOW_LENGTH_BOUNDS = (10, 60) +# time in minutes after which the dynamic window size is re-computed +DYNAMIC_WINDOW_UPDATE_FREQUENCY = 30 +# dynamic window calculation is based on this many minutes before and after current wave +DYNAMIC_WINDOW_REFERENCE_PERIOD = (12 * 60, 0) +# total number of different window lengths to try within length bounds +NUM_DYNAMIC_WINDOWS = 11 +# number of times each window is re-evaluated for better statistics +NUM_DYNAMIC_WINDOW_SAMPLES = 10 # gravitational acceleration in m/s^2 GRAVITY = 9.81 @@ -32,13 +44,20 @@ DENSITY = 1024 # QC thresholds -QC_FLAG_A_THRESHOLD = 25 # maximum allowed zero-crossing period (in seconds) -QC_FLAG_B_THRESHOLD = 2 # maximum allowed multiple of limit rate of change -QC_FLAG_C_THRESHOLD = 10 # maximum allowed number of consecutive identical values -QC_FLAG_D_THRESHOLD = 8 # maximum allowed exceedance of surface elevation MADN -QC_FLAG_E_THRESHOLD = 2 # maximum float precision to which sampling rate has to be uniform -QC_FLAG_F_THRESHOLD = 0.05 # maximum allowed ratio of invalid data -QC_FLAG_G_THRESHOLD = 100 # minimum number of zero-crossing periods in wave history +# maximum allowed zero-crossing period (in seconds) +QC_FLAG_A_THRESHOLD = 25 +# maximum allowed multiple of limit rate of change +QC_FLAG_B_THRESHOLD = 2 +# maximum allowed number of consecutive identical values +QC_FLAG_C_THRESHOLD = 10 +# maximum allowed exceedance of surface elevation MADN +QC_FLAG_D_THRESHOLD = 8 +# maximum float precision to which sampling rate has to be uniform +QC_FLAG_E_THRESHOLD = 2 +# maximum allowed ratio of invalid data +QC_FLAG_F_THRESHOLD = 0.05 +# minimum number of zero-crossing periods in wave history +QC_FLAG_G_THRESHOLD = 100 # QC logging options # wave height (in units of sig. wave height) for which failed QC is logged diff --git a/fowd/operators.py b/fowd/operators.py index 9dfd52f..6cb181e 100644 --- a/fowd/operators.py +++ b/fowd/operators.py @@ -56,6 +56,61 @@ def find_wave_indices(z, start_idx=0): active = True +def compute_dynamic_window_size(t, elevation, min_length, max_length, num_windows, num_samples): + """Compute best window size from data by minimizing fluctuations around mean energy drift. + + Reference: + + Fedele, Francesco, et al. “Large Nearshore Storm Waves off the Irish Coast.” + Scientific Reports, vol. 9, no. 1, 1, Nature Publishing Group, Oct. 2019, p. 15406. + www.nature.com, doi:10.1038/s41598-019-51706-8. + + """ + best_window_length = None + best_window_stationarity = float('inf') + + if isinstance(min_length, np.timedelta64): + min_length = int(min_length / np.timedelta64(1, 's')) + + if isinstance(max_length, np.timedelta64): + max_length = int(max_length / np.timedelta64(1, 's')) + + window_lengths = np.linspace(min_length, max_length, num_windows, dtype='int') + + for window_length in window_lengths: + window_stationarity = [] + window_offsets = np.linspace(0, window_length, num_samples, endpoint=False, dtype='int') + + for window_offset in window_offsets: + if window_offset >= len(t): + continue + + current_time_idx = window_offset + window_stds = [] + while True: + next_time = t[current_time_idx] + np.timedelta64(window_length, 's') + if next_time > t[-1]: + break + + next_time_idx = get_time_index(next_time, t) + window_stds.append(np.nanstd(elevation[current_time_idx:next_time_idx], ddof=1.5)) + current_time_idx = next_time_idx + + window_stds = np.asarray(window_stds) + window_stationarity.append(np.nanstd(window_stds[1:] / window_stds[:-1] - 1, ddof=1)) + + mean_window_stationarity = np.nanmean(window_stationarity) + + if mean_window_stationarity < best_window_stationarity: + best_window_length = window_length + best_window_stationarity = mean_window_stationarity + + if best_window_length is None: + return min_length + + return best_window_length + + def add_prefix(dic, prefix): """Adds a prefix to every key in given dictionary.""" return {f'{prefix}_{key}': value for key, value in dic.items()} @@ -145,7 +200,8 @@ def compute_ursell_number(wave_height, wavelength, water_depth): def integrate(y, x): """Computes numerical integral ∫ y(x) dx.""" - return np.trapz(y, x) + mask = np.isfinite(y) & np.isfinite(x) + return np.trapz(y[mask], x[mask]) def compute_elevation(displacement): @@ -209,9 +265,8 @@ def compute_spectral_density(elevation, sample_dt): nperseg = round(SPECTRUM_WINDOW_SIZE / sample_dt) nfft = 2 ** (math.ceil(math.log(nperseg, 2))) # round to next higher power of 2 return scipy.signal.welch( - elevation, 1 / sample_dt, + elevation, 1 / sample_dt, window='hann', nperseg=nperseg, nfft=nfft, noverlap=nperseg // 2, - window='hann', detrend=False # data is already detrended ) @@ -297,7 +352,7 @@ def compute_nth_moment(frequencies, wave_spectral_density, n): def compute_mean_wave_period_spectral(zeroth_moment, frequencies, wave_spectral_density): - """Compute mean wave period from wave spectral density.""" + """Compute mean zero-crossing period from wave spectral density.""" second_moment = compute_nth_moment(frequencies, wave_spectral_density, 2) return np.sqrt(zeroth_moment / second_moment) @@ -421,6 +476,23 @@ def compute_dominant_direction(frequencies, direction, energy_density): return circular_weighted_average(frequencies, direction, weights=energy_density) +def compute_directionality_index(frequencies, spread, spectral_bandwidth, energy_density): + """Compute directionality index R. + + Reference: + + Fedele, Francesco, et al. “Large Nearshore Storm Waves off the Irish Coast.” + Scientific Reports, vol. 9, no. 1, 1, Nature Publishing Group, Oct. 2019, p. 15406. + www.nature.com, doi:10.1038/s41598-019-51706-8. + + """ + total_squared_spread = ( + integrate(np.radians(spread) ** 2 * energy_density, frequencies) + / integrate(energy_density, frequencies) + ) + return total_squared_spread / (2 * spectral_bandwidth ** 2) + + # QC def check_flag_a(zero_crossing_periods, threshold=QC_FLAG_A_THRESHOLD): @@ -686,9 +758,19 @@ def get_directional_parameters(time, frequencies, directional_spread, mean_direc assert len(dominant_directional_spreads) == len(FREQUENCY_INTERVALS) assert len(dominant_directions) == len(FREQUENCY_INTERVALS) + zeroth_moment = compute_nth_moment(frequencies, wave_spectral_density, 0) + first_moment = compute_nth_moment(frequencies, wave_spectral_density, 1) + spectral_bandwidth = compute_bandwidth_narrowness( + zeroth_moment, first_moment, frequencies, wave_spectral_density + ) + directionality_index = compute_directionality_index( + frequencies, directional_spread, spectral_bandwidth, wave_spectral_density + ) + return { 'sampling_time': time, 'dominant_spread_in_frequency_interval': dominant_directional_spreads, 'dominant_direction_in_frequency_interval': dominant_directions, 'peak_wave_direction': peak_wave_direction, + 'directionality_index': directionality_index, } diff --git a/fowd/output.py b/fowd/output.py index dfd1158..ec222ab 100644 --- a/fowd/output.py +++ b/fowd/output.py @@ -181,8 +181,23 @@ # sea state parameter metadata for interval in SEA_STATE_INTERVALS: + if not isinstance(interval, str): + interval = f'{interval}m' + + if interval == 'dynamic': + DATASET_VARIABLES.update({ + 'sea_state_dynamic_window_length': dict( + dims=('wave_id_local',), + dtype='int64', + attrs=dict( + long_name='Length of dynamically computed sea state window', + units='minutes', + ) + ) + }) + DATASET_VARIABLES.update({ - f'sea_state_{interval}m_start_time': dict( + f'sea_state_{interval}_start_time': dict( dims=('wave_id_local',), dtype='int64', attrs=dict( @@ -190,7 +205,7 @@ units=f'milliseconds since {TIME_ORIGIN}', ) ), - f'sea_state_{interval}m_end_time': dict( + f'sea_state_{interval}_end_time': dict( dims=('wave_id_local',), dtype='int64', attrs=dict( @@ -198,7 +213,7 @@ units=f'milliseconds since {TIME_ORIGIN}', ) ), - f'sea_state_{interval}m_significant_wave_height_spectral': dict( + f'sea_state_{interval}_significant_wave_height_spectral': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -206,7 +221,7 @@ units='meters', ) ), - f'sea_state_{interval}m_significant_wave_height_direct': dict( + f'sea_state_{interval}_significant_wave_height_direct': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -214,7 +229,7 @@ units='meters', ) ), - f'sea_state_{interval}m_maximum_wave_height': dict( + f'sea_state_{interval}_maximum_wave_height': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -222,7 +237,7 @@ units='meters', ) ), - f'sea_state_{interval}m_rel_maximum_wave_height': dict( + f'sea_state_{interval}_rel_maximum_wave_height': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -233,7 +248,7 @@ units='1', ) ), - f'sea_state_{interval}m_mean_period_direct': dict( + f'sea_state_{interval}_mean_period_direct': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -241,15 +256,15 @@ units='seconds', ) ), - f'sea_state_{interval}m_mean_period_spectral': dict( + f'sea_state_{interval}_mean_period_spectral': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( - long_name='Mean wave period estimated from spectral density spectrum', + long_name='Mean zero-crossing period estimated from spectral density spectrum', units='seconds', ) ), - f'sea_state_{interval}m_skewness': dict( + f'sea_state_{interval}_skewness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -257,7 +272,7 @@ units='1', ) ), - f'sea_state_{interval}m_kurtosis': dict( + f'sea_state_{interval}_kurtosis': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -265,7 +280,7 @@ units='1', ) ), - f'sea_state_{interval}m_valid_data_ratio': dict( + f'sea_state_{interval}_valid_data_ratio': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -275,7 +290,7 @@ units='1', ) ), - f'sea_state_{interval}m_peak_wave_period': dict( + f'sea_state_{interval}_peak_wave_period': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -283,7 +298,7 @@ units='seconds', ) ), - f'sea_state_{interval}m_peak_wavelength': dict( + f'sea_state_{interval}_peak_wavelength': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -291,7 +306,7 @@ units='meters', ) ), - f'sea_state_{interval}m_steepness': dict( + f'sea_state_{interval}_steepness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -299,7 +314,7 @@ units='1', ) ), - f'sea_state_{interval}m_bandwidth_peakedness': dict( + f'sea_state_{interval}_bandwidth_peakedness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -310,7 +325,7 @@ units='1', ) ), - f'sea_state_{interval}m_bandwidth_narrowness': dict( + f'sea_state_{interval}_bandwidth_narrowness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -318,7 +333,7 @@ units='1', ) ), - f'sea_state_{interval}m_benjamin_feir_index_peakedness': dict( + f'sea_state_{interval}_benjamin_feir_index_peakedness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -326,7 +341,7 @@ units='1', ) ), - f'sea_state_{interval}m_benjamin_feir_index_narrowness': dict( + f'sea_state_{interval}_benjamin_feir_index_narrowness': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -334,7 +349,7 @@ units='1', ) ), - f'sea_state_{interval}m_crest_trough_correlation': dict( + f'sea_state_{interval}_crest_trough_correlation': dict( dims=('wave_id_local',), dtype='float32', attrs=dict( @@ -344,7 +359,7 @@ valid_max=1, ) ), - f'sea_state_{interval}m_energy_in_frequency_interval': dict( + f'sea_state_{interval}_energy_in_frequency_interval': dict( dims=('wave_id_local', 'meta_frequency_band'), dtype='float32', attrs=dict( @@ -352,7 +367,7 @@ units='watts', ) ), - f'sea_state_{interval}m_rel_energy_in_frequency_interval': dict( + f'sea_state_{interval}_rel_energy_in_frequency_interval': dict( dims=('wave_id_local', 'meta_frequency_band'), dtype='float32', attrs=dict( @@ -404,6 +419,18 @@ valid_max=360 ) ), + direction_directionality_index=dict( + dims=('wave_id_local',), + dtype='float32', + attrs=dict( + long_name=( + 'Directionality index R (squared ratio of directional spread and ' + 'spectral bandwidth)' + ), + units='1', + valid_min=0, + ) + ) ) diff --git a/fowd/processing.py b/fowd/processing.py index 68720e3..aa75b2a 100644 --- a/fowd/processing.py +++ b/fowd/processing.py @@ -19,13 +19,16 @@ from .constants import ( WAVE_HISTORY_LENGTH, SEA_STATE_INTERVALS, - QC_FAIL_LOG_THRESHOLD, QC_EXTREME_WAVE_LOG_THRESHOLD + QC_FAIL_LOG_THRESHOLD, QC_EXTREME_WAVE_LOG_THRESHOLD, + DYNAMIC_WINDOW_LENGTH_BOUNDS, DYNAMIC_WINDOW_UPDATE_FREQUENCY, + DYNAMIC_WINDOW_REFERENCE_PERIOD, NUM_DYNAMIC_WINDOWS, + NUM_DYNAMIC_WINDOW_SAMPLES ) from .operators import ( find_wave_indices, add_prefix, get_time_index, get_md5_hash, get_proc_version, get_station_meta, get_wave_parameters, get_sea_parameters, get_directional_parameters, - check_quality_flags, compute_significant_wave_height + check_quality_flags, compute_significant_wave_height, compute_dynamic_window_size, ) logger = logging.getLogger(__name__) @@ -112,6 +115,30 @@ def is_same_version(version1, version2): return split_v1[:2] == split_v2[:2] +@contextlib.contextmanager +def strict_filelock(target_file): + """File-based lock that throws an exception if lock is already in place.""" + lockfile = f'{target_file}.lock' + if os.path.isfile(lockfile): + raise RuntimeError( + f'File {target_file} appears to be locked. ' + 'Make sure that no other process is accessing it, then remove manually.' + ) + + with open(lockfile, 'w'): + pass + + try: + yield + finally: + try: + os.remove(lockfile) + except OSError: + warnings.warn(f'Could not remove lock file {lockfile}') + + +# main functions + def initialize_processing(record_file, state_file, input_hash): """Parse temporary files to re-start processing if possible.""" default_params = dict( @@ -169,26 +196,37 @@ def initialize_processing(record_file, state_file, input_hash): ) -@contextlib.contextmanager -def strict_filelock(target_file): - """File-based lock that throws an exception if lock is already in place.""" - lockfile = f'{target_file}.lock' - if os.path.isfile(lockfile): - raise RuntimeError( - f'File {target_file} appears to be locked. ' - 'Make sure that no other process is accessing it, then remove manually.' - ) +def get_dynamic_window_size(current_window_size, current_time, time, elevation, last_updated=None): + skip_update = ( + last_updated is not None + and current_time - last_updated < np.timedelta64(DYNAMIC_WINDOW_UPDATE_FREQUENCY, 'm') + ) - with open(lockfile, 'w'): - pass + if skip_update: + return current_window_size, last_updated - try: - yield - finally: - try: - os.remove(lockfile) - except OSError: - warnings.warn(f'Could not remove lock file {lockfile}') + print("not skipping") + + ref_period_start, ref_period_end = DYNAMIC_WINDOW_REFERENCE_PERIOD + dynamic_window_start = get_time_index( + current_time - np.timedelta64(ref_period_start, 'm'), + time, + ) + dynamic_window_end = get_time_index( + current_time + np.timedelta64(ref_period_end, 'm'), + time, + ) + dynamic_window_idx = slice(dynamic_window_start, dynamic_window_end) + dynamic_sea_state_period = compute_dynamic_window_size( + time[dynamic_window_idx], + elevation[dynamic_window_idx], + min_length=np.timedelta64(DYNAMIC_WINDOW_LENGTH_BOUNDS[0], 'm'), + max_length=np.timedelta64(DYNAMIC_WINDOW_LENGTH_BOUNDS[1], 'm'), + num_windows=NUM_DYNAMIC_WINDOWS, + num_samples=NUM_DYNAMIC_WINDOW_SAMPLES, + ) + + return dynamic_sea_state_period // 60, current_time def compute_wave_records(time, elevation, elevation_normalized, outfile, statefile, @@ -264,6 +302,8 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): postfix=dict(waves_processed=str(local_wave_id)), initial=start_idx ) + dynamic_sea_state_period = dynamic_period_last_update = None + with strict_filelock(outfile), tqdm.tqdm(**pbar_kwargs) as pbar: # initialize output files handle_output(wave_records, wave_params_history, num_flags_fired) @@ -344,7 +384,15 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): # compute sea state parameters for sea_state_period in SEA_STATE_INTERVALS: - offset = np.timedelta64(sea_state_period, 'm') + if sea_state_period == 'dynamic': + dynamic_sea_state_period, dynamic_period_last_update = get_dynamic_window_size( + dynamic_sea_state_period, wave_params["start_time"], time, elevation, + last_updated=dynamic_period_last_update + ) + this_wave_records['sea_state_dynamic_window_length'] = dynamic_sea_state_period + offset = np.timedelta64(dynamic_sea_state_period, 'm') + else: + offset = np.timedelta64(sea_state_period, 'm') sea_state_idx = slice( get_time_index(wave_params['start_time'] - offset, time), @@ -367,8 +415,13 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): water_depth ) + if isinstance(sea_state_period, str): + sea_state_prefix = f'sea_state_{sea_state_period}' + else: + sea_state_prefix = f'sea_state_{sea_state_period}m' + this_wave_records.update( - add_prefix(sea_state_params, f'sea_state_{sea_state_period}m') + add_prefix(sea_state_params, sea_state_prefix) ) # compute directional quantities @@ -385,7 +438,7 @@ def handle_output(wave_records, wave_params_history, num_flags_fired): direction_args['direction_spread'][directional_time_idx], direction_args['direction_mean_direction'][directional_time_idx], direction_args['direction_energy_density'][directional_time_idx], - direction_args['direction_peak_direction'][directional_time_idx] + direction_args['direction_peak_direction'][directional_time_idx], ) last_directional_time_idx = directional_time_idx diff --git a/fowd/sanity/testcases.py b/fowd/sanity/testcases.py index 63149e4..85bb141 100644 --- a/fowd/sanity/testcases.py +++ b/fowd/sanity/testcases.py @@ -52,8 +52,8 @@ def single_group_test_case(t): def directional_test_case(spectral_params): f = np.linspace(1e-3, 1, 1000) spectrum = ochi_hubble_spectrum(f, **spectral_params) - directional_spread = np.linspace(0, 360 * 4, len(f)) % 360 - mean_direction = np.linspace(360 * 4, 0, len(f)) % 360 + directional_spread = np.linspace(0, 180, len(f)) + mean_direction = np.linspace(360, 0, len(f)) return dict( time=None, @@ -151,5 +151,8 @@ def ochi_component(f, swh, peak_period, shape): DIRECTIONAL_TEST_CASES = { 'swell_dominated_directional': directional_test_case( ochi_params['swell_dominated'], + ), + 'wind_dominated_directional': directional_test_case( + ochi_params['wind_dominated'], ) }